频率增强通道注意力机制(FECAM)学习总结

本文提出了一种新的频率增强通道注意力机制(FECAM),旨在解决时间序列预测中傅里叶变换因吉布斯现象导致的高频噪声问题。FECAM基于离散余弦变换,能自适应地模拟信道间的频率依赖性,有效避免预测误差。实验显示,该机制可提升多种主流模型的预测性能,如LSTM、Reformer、Informer和Autoformer等。


时间序列预测是一个长期存在的挑战,因为真实世界的信息处于各种场景(例如,能源,天气,交通,经济,地震预警)。然而,一些主流的预测模型预测结果与实际情况大相径庭。我们认为,这是模型缺乏捕获真实世界数据集中丰富包含的频率信息的能力的原因。目前主流的频率信息提取方法都是基于傅里叶变换(FT)的。然而,由于吉布斯现象,FT的使用是有问题的。如果序列两侧的值差异很大,则在两侧周围观察到振荡近似,并且会引入高频噪声。因此,我们提出了一种新的频率增强信道注意力,它基于离散余弦变换自适应地模拟信道之间的频率相互依赖性,这将从本质上避免傅里叶变换过程中有问题的周期引起的高频噪声,这被定义为吉布斯现象。我们证明了该网络在六个真实数据集中的泛化非常有效,并实现了最先进的性能,我们进一步证明了频率增强信道注意力机制模块可以灵活地应用于不同的网络。该模块可以提高现有主流网络的预测能力,只需几行代码,即可在LSTM上降低35.99%的MSE,在Reformer上降低10.01%,在Informer上降低8.71%,在Autoformer上降低8.29%,在Transformer上降低8.06%等,计算成本很小。

这篇论文提出了一种基于离散余弦变换的频率增强通道注意机制(FECAM),通过捕捉时间序列中的频率信息来提升主流深度学习模型的预测能力,有效避免傅里叶变换引入的高频噪声,显著提升了多种时间序列预测任务的性能。

SENET(channel attention)

FECAM(Frequency Enhanced Channel Attention Mechanism)

As a module to enhance the frequency domain modeling capability of transformers and LSTM

 下面对其部分源码进行解读:

SlefAttention_Family.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as Fimport matplotlib.pyplot as pltimport numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
# from masking import TriangularCausalMask, ProbMask#for parameter count
import osclass FullAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(FullAttention, self).__init__()self.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)def forward(self, queries, keys, values, attn_mask):B, L, H, E = queries.shape_, S, _, D = values.shapescale = self.scale or 1. / sqrt(E)scores = torch.einsum("blhe,bshe->bhls", queries, keys)if self.mask_flag:if attn_mask is None:attn_mask = TriangularCausalMask(B, L, device=queries.device)scores.masked_fill_(attn_mask.mask, -np.inf)A = self.dropout(torch.softmax(scale * scores, dim=-1))V = torch.einsum("bhls,bshd->blhd", A, values)if self.output_attention:return (V.contiguous(), A)else:return (V.contiguous(), None)class ProbAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(ProbAttention, self).__init__()self.factor = factorself.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)def _prob_QK(self, Q, K, sample_k, n_top):  # n_top: c*ln(L_q)# Q [B, H, L, D]B, H, L_K, E = K.shape_, _, L_Q, _ = Q.shape# calculate the sampled Q_KK_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)index_sample = torch.randint(L_K, (L_Q, sample_k))  # real U = U_part(factor*ln(L_k))*L_qK_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()# find the Top_k query with sparisty measurementM = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)M_top = M.topk(n_top, sorted=False)[1]# use the reduced Q to calculate Q_KQ_reduce = Q[torch.arange(B)[:, None, None],torch.arange(H)[None, :, None],M_top, :]  # factor*ln(L_q)Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))  # factor*ln(L_q)*L_kreturn Q_K, M_topdef _get_initial_context(self, V, L_Q):B, H, L_V, D = V.shapeif not self.mask_flag:# V_sum = V.sum(dim=-2)V_sum = V.mean(dim=-2)contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()else:  # use maskassert (L_Q == L_V)  # requires that L_Q == L_V, i.e. for self-attention onlycontex = V.cumsum(dim=-2)return contexdef _update_context(self, context_in, V, scores, index, L_Q, attn_mask):B, H, L_V, D = V.shapeif self.mask_flag:attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)scores.masked_fill_(attn_mask.mask, -np.inf)attn = torch.softmax(scores, dim=-1)  # nn.Softmax(dim=-1)(scores)context_in[torch.arange(B)[:, None, None],torch.arange(H)[None, :, None],index, :] = torch.matmul(attn, V).type_as(context_in)if self.output_attention:attns = (torch.ones([B, H, L_V, L_V]) / L_V).type_as(attn).to(attn.device)attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attnreturn (context_in, attns)else:return (context_in, None)def forward(self, queries, keys, values, attn_mask):B, L_Q, H, D = queries.shape_, L_K, _, _ = keys.shapequeries = queries.transpose(2, 1)keys = keys.transpose(2, 1)values = values.transpose(2, 1)U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item()  # c*ln(L_k)u = self.factor * np.ceil(np.log(L_Q)).astype('int').item()  # c*ln(L_q)U_part = U_part if U_part < L_K else L_Ku = u if u < L_Q else L_Qscores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)# add scale factorscale = self.scale or 1. / sqrt(D)if scale is not None:scores_top = scores_top * scale# get the contextcontext = self._get_initial_context(values, L_Q)# update the context with selected top_k queriescontext, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)return context.contiguous(), attnclass AttentionLayer(nn.Module):def __init__(self, attention, d_model, n_heads, d_keys=None,d_values=None):super(AttentionLayer, self).__init__()d_keys = d_keys or (d_model // n_heads)d_values = d_values or (d_model // n_heads)self.inner_attention = attentionself.query_projection = nn.Linear(d_model, d_keys * n_heads)self.key_projection = nn.Linear(d_model, d_keys * n_heads)self.value_projection = nn.Linear(d_model, d_values * n_heads)self.out_projection = nn.Linear(d_values * n_heads, d_model)self.n_heads = n_headsdef forward(self, queries, keys, values, attn_mask):B, L, _ = queries.shape_, S, _ = keys.shapeH = self.n_headsqueries = self.query_projection(queries).view(B, L, H, -1)keys = self.key_projection(keys).view(B, S, H, -1)values = self.value_projection(values).view(B, S, H, -1)out, attn = self.inner_attention(queries,keys,values,attn_mask)out = out.view(B, L, -1)return self.out_projection(out), attn

这个代码实现了一个注意力机制模块,主要包括了两种注意力机制(全局注意力 FullAttention 和 概率注意力 ProbAttention),以及一个常规的 AttentionLayer 用于处理多头注意力。以下是详细的解释:

1. 基础导入

import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
  • torchtorch.nntorch.nn.functional 是 PyTorch 的核心模块,用于定义和操作神经网络结构。
  • matplotlib.pyplotnumpy 是常用的 Python 库,用于绘图和数值计算。
  • TriangularCausalMaskProbMask 用于为注意力机制提供掩码(masking),以确保特定位置的注意力权重被正确计算。

2. FullAttention 类(全局注意力)

class FullAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(FullAttention, self).__init__()self.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)
  • FullAttention 实现了全局自注意力机制。它通过 querieskeys 计算注意力权重。
  • mask_flag:决定是否应用掩码。
  • scale:对注意力分数进行缩放。
  • attention_dropout:为防止过拟合,应用 Dropout。

forward 函数:

def forward(self, queries, keys, values, attn_mask):B, L, H, E = queries.shape_, S, _, D = values.shapescale = self.scale or 1. / sqrt(E)scores = torch.einsum("blhe,bshe->bhls", queries, keys)if self.mask_flag:if attn_mask is None:attn_mask = TriangularCausalMask(B, L, device=queries.device)scores.masked_fill_(attn_mask.mask, -np.inf)A = self.dropout(torch.softmax(scale * scores, dim=-1))V = torch.einsum("bhls,bshd->blhd", A, values)return (V.contiguous(), A if self.output_attention else None)
  • torch.einsum:使用爱因斯坦求和约定来进行矩阵乘法,计算 querieskeys 的相似度。
  • mask_flag:当使用掩码时,将填充 -np.inf 来忽略特定位置的注意力分数。
  • softmax:用于将分数转换为概率分布。
  • V:是使用注意力分数和 values 计算出的加权和。

3. ProbAttention 类(概率注意力)

class ProbAttention(nn.Module):def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):super(ProbAttention, self).__init__()self.factor = factorself.scale = scaleself.mask_flag = mask_flagself.output_attention = output_attentionself.dropout = nn.Dropout(attention_dropout)
  • ProbAttention 实现了一种基于稀疏注意力的机制。通过概率的方法减少计算量,特别适用于长序列的处理。
  • factor:控制采样和稀疏度。
  • scaleattention_dropout 类似于 FullAttention 中的作用。

_prob_QK 函数:

def _prob_QK(self, Q, K, sample_k, n_top):K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)index_sample = torch.randint(L_K, (L_Q, sample_k))K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)M_top = M.topk(n_top, sorted=False)[1]Q_reduce = Q[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], M_top, :]Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))return Q_K, M_top
  • 目的:通过抽样计算 QK 的相似度,从而减少全局计算开销。
  • 采样:通过从 K 中随机选择一部分,并与 Q 进行计算,得出最具代表性的 Top_k 查询,减少计算负担。

4. AttentionLayer 类

class AttentionLayer(nn.Module):def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None):super(AttentionLayer, self).__init__()self.inner_attention = attentionself.query_projection = nn.Linear(d_model, d_keys * n_heads)self.key_projection = nn.Linear(d_model, d_keys * n_heads)self.value_projection = nn.Linear(d_model, d_values * n_heads)self.out_projection = nn.Linear(d_values * n_heads, d_model)self.n_heads = n_heads
  • AttentionLayer 是多头注意力层,将输入的 querieskeysvalues 进行线性变换,然后通过注意力机制处理。
  • d_model:输入特征的维度。
  • n_heads:多头注意力的头数。
  • inner_attention:可以是 FullAttentionProbAttention
forward 函数:
def forward(self, queries, keys, values, attn_mask):B, L, _ = queries.shapeH = self.n_headsqueries = self.query_projection(queries).view(B, L, H, -1)keys = self.key_projection(keys).view(B, S, H, -1)values = self.value_projection(values).view(B, S, H, -1)out, attn = self.inner_attention(queries,keys,values,attn_mask)out = out.view(B, L, -1)return self.out_projection(out), attn
  • 线性变换querieskeysvalues 都通过线性层进行投影,转换为适应多头注意力的形状。
  • 多头注意力:将 querieskeysvalues 分成多个头来计算注意力。
  • 输出投影:最后通过 out_projection 投影到原始维度。

总结

  • 全局注意力FullAttention)计算每对 querieskeys 的相似度,用于标准的 Transformer 模型。
  • 概率注意力ProbAttention)通过稀疏采样减少计算复杂度,特别适用于长序列输入。
  • AttentionLayer 实现了一个多头注意力机制,可以选择全局注意力或概率注意力来处理输入。

这段代码的核心是实现不同的注意力机制,主要用于序列到序列的建模任务,例如时间序列预测或自然语言处理任务。


在 PyTorch 中,forward 函数是每个自定义神经网络模块(nn.Module)的核心方法,它定义了模型的前向传播过程。前向传播是指输入数据如何经过网络层,逐步计算出输出结果的过程。

forward 函数的主要作用:

  1. 定义模型的计算逻辑forward 函数负责具体的计算过程。每当你将输入传入模型时,模型会自动调用 forward 函数来执行从输入到输出的所有计算。

    forward 函数中,你将定义:

    • 数据如何流经每一层(如卷积层、全连接层、注意力机制等)。
    • 应用哪些激活函数、正则化(如 BatchNormDropout)等操作。
    • 最终如何产生输出。
  2. 将输入映射到输出forward 函数的核心任务是将输入数据(张量)传递到各层并生成输出。例如,在卷积神经网络中,输入张量会经过卷积层、池化层、激活函数等模块,最后输出结果。

  3. 自动执行反向传播: 虽然 forward 仅定义前向传播过程,但它与自动微分(autograd)紧密相连。当你调用 loss.backward() 时,PyTorch 会自动根据 forward 中的操作来计算梯度,进而进行反向传播和参数更新。


Transformer_EncDec.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.dctnet import dct_channel_block, dct
# from dctnet import dct_channel_block, dctclass ConvLayer(nn.Module):def __init__(self, c_in):super(ConvLayer, self).__init__()self.downConv = nn.Conv1d(in_channels=c_in,out_channels=c_in,kernel_size=3,padding=2,padding_mode='circular')self.norm = nn.BatchNorm1d(c_in)self.activation = nn.ELU()self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)def forward(self, x):x = self.downConv(x.permute(0, 2, 1))x = self.norm(x)x = self.activation(x)x = self.maxPool(x)x = x.transpose(1, 2)return xclass EncoderLayer(nn.Module):def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):super(EncoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.attention = attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.geluself.dct_layer=dct_channel_block(512)self.dct_norm = nn.LayerNorm([512], eps=1e-6)def forward(self, x, attn_mask=None):# print(x.shape)#torch.Size([32, 96, 512])#torch.Size([32, 49, 512])'''before self-attention'''# mid  = self.dct_layer(x)## x = x+midnew_x, attn = self.attention(x, x, x,attn_mask=attn_mask)x = x + self.dropout(new_x)y = x = self.norm1(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))# print("y.shape:",y.shape)#y.shape: torch.Size([32, 96, 512])#为了减少计算量,不然要做512次L=96的dct#加入到moduleresult = self.norm2(x + y)# mid  = self.dct_layer(result)# result = result+mid# result = self.dct_norm(result) #norm 144return result, attn# return self.norm2(x + y), attnclass Encoder(nn.Module):def __init__(self, attn_layers, conv_layers=None, norm_layer=None):super(Encoder, self).__init__()self.attn_layers = nn.ModuleList(attn_layers)self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else Noneself.norm = norm_layerdef forward(self, x, attn_mask=None):# x [B, L, D]attns = []if self.conv_layers is not None:for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):x, attn = attn_layer(x, attn_mask=attn_mask)x = conv_layer(x)attns.append(attn)x, attn = self.attn_layers[-1](x)attns.append(attn)else:for attn_layer in self.attn_layers:x, attn = attn_layer(x, attn_mask=attn_mask)attns.append(attn)if self.norm is not None:x = self.norm(x)return x, attnsclass DecoderLayer(nn.Module):def __init__(self, self_attention, cross_attention, d_model, d_ff=None,dropout=0.1, activation="relu"):super(DecoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.self_attention = self_attentionself.cross_attention = cross_attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.norm3 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.gelu# self.dct_layer=dct_channel_block(512)# self.dct_norm = nn.LayerNorm([512], eps=1e-6)def forward(self, x, cross, x_mask=None, cross_mask=None):x = x + self.dropout(self.self_attention(x, x, x,attn_mask=x_mask)[0])x = self.norm1(x)x = x + self.dropout(self.cross_attention(x, cross, cross,attn_mask=cross_mask)[0])y = x = self.norm2(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))# result = self.norm3(x + y)# mid  = self.dct_layer(result)# result = result+mid# result = self.dct_norm(result) #norm 144# return result# return self.norm3(x + y)return self.norm3(x + y)class Decoder(nn.Module):def __init__(self, layers, norm_layer=None, projection=None):super(Decoder, self).__init__()self.layers = nn.ModuleList(layers)self.norm = norm_layerself.projection = projectiondef forward(self, x, cross, x_mask=None, cross_mask=None):for layer in self.layers:x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)if self.norm is not None:x = self.norm(x)if self.projection is not None:x = self.projection(x)return x

这段代码定义了一个类似 Transformer 的神经网络架构,包含卷积层、编码器层和解码器层。它可以用于处理序列数据(如时间序列、自然语言处理等)。具体包括以下组件:

1. ConvLayer(卷积层):

class ConvLayer(nn.Module):def __init__(self, c_in):super(ConvLayer, self).__init__()self.downConv = nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=3, padding=2, padding_mode='circular')self.norm = nn.BatchNorm1d(c_in)self.activation = nn.ELU()self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
  • 这个卷积层首先对输入进行 1D 卷积,然后进行批归一化(BatchNorm)和 ELU 激活,再通过最大池化层减少特征维度。
  • 作用:将高维的输入进行降采样,同时提取局部特征。

2. EncoderLayer(编码器层)

class EncoderLayer(nn.Module):def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):super(EncoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.attention = attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.geluself.dct_layer = dct_channel_block(512)  # DCT增强self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • 作用:这个层结合了注意力机制、自注意力(self-attention)、卷积层(用于前馈网络),并且通过 LayerNorm 和 Dropout 进行正则化。它还引入了 DCT(离散余弦变换)来增强输入的特征处理。
  • 自注意力机制:通过 attention 来捕获不同位置之间的相关性。
  • 卷积网络:通过卷积进一步对特征进行变换和学习。

3. Encoder(编码器)

class Encoder(nn.Module):def __init__(self, attn_layers, conv_layers=None, norm_layer=None):super(Encoder, self).__init__()self.attn_layers = nn.ModuleList(attn_layers)self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else Noneself.norm = norm_layer
  • 作用:这个类是编码器的整体结构,由多个 EncoderLayer 组成。如果有卷积层(conv_layers),会在每次注意力计算后对输入进行卷积操作。
  • 流程:输入经过每一层的注意力层,最后通过 LayerNorm 进行归一化。

4. DecoderLayer(解码器层)

class DecoderLayer(nn.Module):def __init__(self, self_attention, cross_attention, d_model, d_ff=None, dropout=0.1, activation="relu"):super(DecoderLayer, self).__init__()d_ff = d_ff or 4 * d_modelself.self_attention = self_attentionself.cross_attention = cross_attentionself.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.norm3 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)self.activation = F.relu if activation == "relu" else F.gelu
  • 作用:解码器层包括了自注意力(self_attention)和交叉注意力(cross_attention)。自注意力用于捕捉解码器内部的依赖关系,而交叉注意力用于将编码器的输出与解码器的输入结合。
  • 卷积层和归一化:使用卷积和归一化对注意力层的输出进一步处理。

5. Decoder(解码器)

class Decoder(nn.Module):def __init__(self, layers, norm_layer=None, projection=None):super(Decoder, self).__init__()self.layers = nn.ModuleList(layers)self.norm = norm_layerself.projection = projection
  • 作用:解码器由多个 DecoderLayer 组成。输入首先通过每一层的解码器,结合来自编码器的输出,最后通过归一化和可选的投影层生成最终的输出。

6. 前向传播 (forward) 函数

每个模块都有一个 forward 函数,定义了数据在该模块中的流动过程。

ConvLayer.forward
def forward(self, x):x = self.downConv(x.permute(0, 2, 1))x = self.norm(x)x = self.activation(x)x = self.maxPool(x)x = x.transpose(1, 2)return x
  • 操作顺序
    1. 输入经过一维卷积处理。
    2. 批归一化。
    3. 激活函数 ELU。
    4. 最大池化。
    5. 变换维度并返回输出。
EncoderLayer.forward
def forward(self, x, attn_mask=None):new_x, attn = self.attention(x, x, x, attn_mask=attn_mask)x = x + self.dropout(new_x)y = x = self.norm1(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))result = self.norm2(x + y)return result, attn
  • 操作顺序
    1. 输入通过注意力层进行自注意力计算。
    2. 经过残差连接和 Dropout。
    3. 进行 LayerNorm 归一化。
    4. 输入通过前馈卷积网络。
    5. 最后输出经过归一化的结果。
DecoderLayer.forward
def forward(self, x, cross, x_mask=None, cross_mask=None):x = x + self.dropout(self.self_attention(x, x, x, attn_mask=x_mask)[0])x = self.norm1(x)x = x + self.dropout(self.cross_attention(x, cross, cross, attn_mask=cross_mask)[0])y = x = self.norm2(x)y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))y = self.dropout(self.conv2(y).transpose(-1, 1))return self.norm3(x + y)
  • 操作顺序
    1. 输入通过自注意力计算。
    2. 进行 LayerNorm 归一化。
    3. 再通过交叉注意力与编码器的输出结合。
    4. 经过前馈卷积层,最后输出结果。

总结

  • 卷积层 用于特征提取和降采样。
  • 编码器解码器 使用注意力机制来捕捉不同时间步之间的依赖关系,并通过卷积层进行进一步的特征处理。
  • 前向传播 函数定义了每个模块内部的数据流动和处理逻辑,结合了卷积、注意力机制和正则化。

Embed.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
import mathclass PositionalEmbedding(nn.Module):def __init__(self, d_model, max_len=5000):super(PositionalEmbedding, self).__init__()# Compute the positional encodings once in log space.pe = torch.zeros(max_len, d_model).float()pe.require_grad = Falseposition = torch.arange(0, max_len).float().unsqueeze(1)div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):return self.pe[:, :x.size(1)]class TokenEmbedding(nn.Module):def __init__(self, c_in, d_model):super(TokenEmbedding, self).__init__()padding = 1 if torch.__version__ >= '1.5.0' else 2self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,kernel_size=3, padding=padding, padding_mode='circular', bias=False)for m in self.modules():if isinstance(m, nn.Conv1d):nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')def forward(self, x):x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)return xclass FixedEmbedding(nn.Module):def __init__(self, c_in, d_model):super(FixedEmbedding, self).__init__()w = torch.zeros(c_in, d_model).float()w.require_grad = Falseposition = torch.arange(0, c_in).float().unsqueeze(1)div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()w[:, 0::2] = torch.sin(position * div_term)w[:, 1::2] = torch.cos(position * div_term)self.emb = nn.Embedding(c_in, d_model)self.emb.weight = nn.Parameter(w, requires_grad=False)def forward(self, x):return self.emb(x).detach()class TemporalEmbedding(nn.Module):def __init__(self, d_model, embed_type='fixed', freq='h'):super(TemporalEmbedding, self).__init__()minute_size = 4hour_size = 24weekday_size = 7day_size = 32month_size = 13Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embeddingif freq == 't':self.minute_embed = Embed(minute_size, d_model)self.hour_embed = Embed(hour_size, d_model)self.weekday_embed = Embed(weekday_size, d_model)self.day_embed = Embed(day_size, d_model)self.month_embed = Embed(month_size, d_model)def forward(self, x):x = x.long()minute_x = self.minute_embed(x[:, :, 4]) if hasattr(self, 'minute_embed') else 0.hour_x = self.hour_embed(x[:, :, 3])weekday_x = self.weekday_embed(x[:, :, 2])day_x = self.day_embed(x[:, :, 1])month_x = self.month_embed(x[:, :, 0])return hour_x + weekday_x + day_x + month_x + minute_xclass TimeFeatureEmbedding(nn.Module):def __init__(self, d_model, embed_type='timeF', freq='h'):super(TimeFeatureEmbedding, self).__init__()freq_map = {'h': 4, 't': 5, 's': 6, 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}d_inp = freq_map[freq]self.embed = nn.Linear(d_inp, d_model, bias=False)def forward(self, x):return self.embed(x)class DataEmbedding(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)return self.dropout(x)class DataEmbedding_wo_pos(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding_wo_pos, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x) + self.temporal_embedding(x_mark)return self.dropout(x)class DataEmbedding_wo_pos_temp(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding_wo_pos_temp, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x)return self.dropout(x)class DataEmbedding_wo_temp(nn.Module):def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):super(DataEmbedding_wo_temp, self).__init__()self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)self.position_embedding = PositionalEmbedding(d_model=d_model)self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)self.dropout = nn.Dropout(p=dropout)def forward(self, x, x_mark):x = self.value_embedding(x) + self.position_embedding(x)return self.dropout(x)

这段代码实现了一系列嵌入层(Embedding Layers),主要用于将输入数据(如时间序列数据)转换为适合模型处理的向量表示。这种嵌入是序列建模中非常重要的一部分,特别是在处理时间序列数据时。

主要模块解释

  1. PositionalEmbedding(位置嵌入)

    • 作用:为每个序列位置编码一个向量,使得模型能够感知序列中每个位置的顺序。常用于 Transformer 模型中,因为这种模型本身不具备位置信息的感知能力。
    • 实现细节
      • 使用正弦和余弦函数为不同位置生成不同的嵌入,位置嵌入向量的奇偶位分别使用 sincos
      • pe 是一个预计算的二维张量,包含最大序列长度(max_len)和每个位置的嵌入(d_model 的维度)。
  2. TokenEmbedding(令牌嵌入)

    • 作用:将输入的数据通过一维卷积进行嵌入。可以理解为将输入的多维数据映射到一个高维的特征空间。
    • 实现细节
      • 使用 1D 卷积将输入通道数(c_in)转换为模型维度(d_model)。
      • 初始化权重时使用 Kaiming 初始化方法,适合 ReLU 和 LeakyReLU 激活函数。
  3. FixedEmbedding(固定嵌入)

    • 作用:通过固定的正弦和余弦函数生成嵌入,类似于位置嵌入,但它是用于固定长度的输入数据。
    • 实现细节
      • 每个位置的奇数位使用 sin,偶数位使用 cos
      • 权重矩阵 w 是固定的,不参与训练。
  4. TemporalEmbedding(时间嵌入)

    • 作用:为时间特征(如小时、天、月份等)生成嵌入,特别适用于处理时间序列数据。
    • 实现细节
      • 支持嵌入不同的时间粒度(如分钟、小时、周几、天、月份等)。
      • 可以选择使用固定嵌入(FixedEmbedding)或可学习嵌入(nn.Embedding)。
  5. TimeFeatureEmbedding(时间特征嵌入)

    • 作用:为特定时间粒度(如小时、天、周等)的数值进行线性嵌入。
    • 实现细节
      • 根据 freq_map 映射不同的时间粒度,并使用线性变换嵌入。
  6. DataEmbedding(数据嵌入)

    • 作用:组合了 TokenEmbeddingPositionalEmbeddingTemporalEmbedding,生成完整的嵌入向量。
    • 实现细节
      • 先将输入数据通过 TokenEmbedding 进行嵌入。
      • 然后结合 TemporalEmbedding 提取时间相关特征。
      • 还会加入位置嵌入(PositionalEmbedding)来提供序列位置信息。
      • 最后通过 Dropout 防止过拟合。
  7. DataEmbedding_wo_pos(无位置嵌入的数据嵌入)

    • 作用:与 DataEmbedding 类似,但不包含位置嵌入。
    • 实现细节
      • 只包含 TokenEmbeddingTemporalEmbedding,省略位置嵌入部分。
  8. DataEmbedding_wo_pos_temp(无位置和时间嵌入的数据嵌入)

    • 作用:只进行 TokenEmbedding,不包括时间和位置嵌入。
    • 实现细节
      • 仅使用 TokenEmbedding 来处理输入数据。
  9. DataEmbedding_wo_temp(无时间嵌入的数据嵌入)

    • 作用:不包括时间嵌入,但仍然使用位置嵌入。
    • 实现细节
      • 包含 TokenEmbeddingPositionalEmbedding,省略了 TemporalEmbedding

嵌入层的整体作用

这些嵌入层的主要作用是将输入的时间序列数据转换成向量表示,这样可以更好地被神经网络模型(如 Transformer 或 CNN)处理。时间序列数据通常包含时间信息(如天、小时等),嵌入层通过提取这些时间信息来增强模型对时间依赖性的感知。通过组合 TokenEmbedding(提取特征)、PositionalEmbedding(位置信息)和 TemporalEmbedding(时间信息),模型可以捕捉到输入数据的复杂时空关系。

dctnet.py代码:

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch
try:from torch import irfftfrom torch import rfft
except ImportError:def rfft(x, d):t = torch.fft.fft(x, dim = (-d))r = torch.stack((t.real, t.imag), -1)return rdef irfft(x, d):t = torch.fft.ifft(torch.complex(x[:,:,0], x[:,:,1]), dim = (-d))return t.realdef dct(x, norm=None):"""Discrete Cosine Transform, Type II (a.k.a. the DCT)For the meaning of the parameter `norm`, see:https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.fftpack.dct.html:param x: the input signal:param norm: the normalization, None or 'ortho':return: the DCT-II of the signal over the last dimension"""x_shape = x.shapeN = x_shape[-1]x = x.contiguous().view(-1, N)v = torch.cat([x[:, ::2], x[:, 1::2].flip([1])], dim=1)# Vc = torch.fft.rfft(v, 1, onesided=False)Vc = rfft(v, 1)k = - torch.arange(N, dtype=x.dtype, device=x.device)[None, :] * np.pi / (2 * N)W_r = torch.cos(k)W_i = torch.sin(k)V = Vc[:, :, 0] * W_r - Vc[:, :, 1] * W_iif norm == 'ortho':V[:, 0] /= np.sqrt(N) * 2V[:, 1:] /= np.sqrt(N / 2) * 2V = 2 * V.view(*x_shape)return V# class senet_block(nn.Module):
#     def __init__(self, channel=512, ratio=1):
#         super(dct_channel_block, self).__init__()
#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(channel, channel // 4, bias=False),
#                 nn.ReLU(inplace=True),
#                 nn.Linear(channel //4, channel, bias=False),
#                 nn.Sigmoid()
#         )#     def forward(self, x):
#         # b, c, l = x.size() # (B,C,L)
#         # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
#         # print("y",y.shape)
#         x = x.permute(0,2,1)
#         b, c, l = x.size() 
#         y = self.avg_pool(x).view(b, c) # (B,C,L) ->(B,C,1)
#         # print("y",y.shape)
#         # y = self.fc(y).view(b, c, 96)#         y = self.fc(y).view(b,c,1)
#         # print("y",y.shape)
#         # return x * y
#         return (x*y).permute(0,2,1)
class dct_channel_block(nn.Module):def __init__(self, channel):super(dct_channel_block, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.fc = nn.Sequential(nn.Linear(channel, channel*2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear( channel*2, channel, bias=False),nn.Sigmoid())# self.dct_norm = nn.LayerNorm([512], eps=1e-6)self.dct_norm = nn.LayerNorm([96], eps=1e-6)#for lstm on length-wise# self.dct_norm = nn.LayerNorm([36], eps=1e-6)#for lstm on length-wise on ill with input =36def forward(self, x):b, c, l = x.size() # (B,C,L) (32,96,512)# y = self.avg_pool(x) # (B,C,L) -> (B,C,1)# y = self.avg_pool(x).view(b, c) # (B,C,L) -> (B,C,1)# print("y",y.shape# y = self.fc(y).view(b, c, 96)list = []for i in range(c):freq=dct(x[:,i,:])     # print("freq-shape:",freq.shape)list.append(freq)stack_dct=torch.stack(list,dim=1)stack_dct = torch.tensor(stack_dct)'''for traffic mission:f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic datasets'''lr_weight = self.dct_norm(stack_dct) lr_weight = self.fc(stack_dct)lr_weight = self.dct_norm(lr_weight) # print("lr_weight",lr_weight.shape)return x *lr_weight #resultif __name__ == '__main__':tensor = torch.rand(8,7,96)dct_model = dct_channel_block()result = dct_model.forward(tensor) print("result.shape:",result.shape)

这段代码实现了基于离散余弦变换(DCT)的通道块,用于增强神经网络中的特征表示。以下是代码的详细解释:

1. 导入库和兼容性处理

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch

导入了 PyTorch 的 nn 模块用于神经网络构建,mathnumpy 用于数学运算。

try:from torch import irfftfrom torch import rfft
except ImportError:def rfft(x, d):t = torch.fft.fft(x, dim=(-d))r = torch.stack((t.real, t.imag), -1)return rdef irfft(x, d):t = torch.fft.ifft(torch.complex(x[:, :, 0], x[:, :, 1]), dim=(-d))return t.real

这一段代码用于兼容旧版本的 PyTorch,定义了 rfftirfft 函数。在较新的 PyTorch 版本中,rfftirfft 函数可能已被替换,因此这里使用自定义实现确保代码的兼容性。

2. DCT(离散余弦变换)

def dct(x, norm=None):...

这个函数实现了 离散余弦变换 (DCT),具体是 DCT-II 类型。DCT 是一种频域转换,通常用于信号处理和特征提取,它可以将时域信号转换为频域表示。

主要步骤:
  • 重新排列数据:将输入张量中的偶数和奇数索引部分组合在一起。
  • 进行 FFT(快速傅里叶变换):通过 rfft 函数进行快速傅里叶变换,将信号转换到频域。
  • 正弦和余弦加权:通过 cossin 函数对 FFT 的结果进行加权,得到离散余弦变换的结果。
  • 归一化处理:如果指定 norm='ortho',进行正交归一化。

3. DCT 通道块

class dct_channel_block(nn.Module):def __init__(self, channel):super(dct_channel_block, self).__init__()self.fc = nn.Sequential(nn.Linear(channel, channel * 2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear(channel * 2, channel, bias=False),nn.Sigmoid())self.dct_norm = nn.LayerNorm([96], eps=1e-6)

这个类实现了基于 DCT 的通道块,主要用于对输入张量进行变换,增强特征的表达能力。

主要组件:
  • 全连接层(fc:使用了两层全连接层,结合 ReLU 激活函数和 Dropout,来对 DCT 变换后的频域特征进行进一步处理。
  • LayerNorm(dct_norm:用于标准化处理,以稳定训练过程。
前向传播 (forward):
def forward(self, x):b, c, l = x.size() # (B,C,L) (32,96,512)list = []for i in range(c):freq = dct(x[:, i, :])list.append(freq)stack_dct = torch.stack(list, dim=1)stack_dct = torch.tensor(stack_dct)lr_weight = self.dct_norm(stack_dct)lr_weight = self.fc(stack_dct)lr_weight = self.dct_norm(lr_weight)return x * lr_weight
  • 输入x 的形状为 (B, C, L),其中 B 是批量大小,C 是通道数,L 是序列长度。
  • DCT 变换:对于每个通道,计算 DCT,得到频域表示。
  • 处理 DCT 结果:将 DCT 结果通过全连接层和标准化层处理,得到特征权重。
  • 输出:将原始输入与 DCT 处理后的权重相乘,得到最终的输出。

4. 主函数

if __name__ == '__main__':tensor = torch.rand(8, 7, 96)dct_model = dct_channel_block(96)result = dct_model.forward(tensor)print("result.shape:", result.shape)

这部分代码用于测试 dct_channel_block 类:

  • 输入:随机生成一个形状为 (8, 7, 96) 的张量,代表批量大小为 8,通道数为 7,序列长度为 96。
  • 模型实例化:实例化 dct_channel_block,并使用前向传播方法对输入进行处理。
  • 输出:打印处理后的输出张量的形状。

总结

  • DCT 的作用:DCT 可以将时域信号转换为频域信号,在信号处理和特征提取中非常有用。通过这种变换,可以捕捉输入数据中的频率模式,从而增强模型的表达能力。
  • 通道块的作用dct_channel_block 结合 DCT 和全连接层来处理输入特征,并生成频域权重以调整输入特征。
  • 代码的目的:实现了一种结合 DCT 的特征增强机制,旨在通过频域处理提升神经网络在时序数据上的表现。

LSTM.py代码:

from pip import main
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
# from dctnet import dct_channel_block,dct #for parameters calc
import argparse
from fvcore.nn import FlopCountAnalysis,parameter_count_table  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通def __init__(self,configs,input_size=None,hidden_size=16, output_size=None,batch_size=None,num_layers=2):#input_size与output_size是通道数super(Model, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.input_size = configs.enc_in #channelself.hidden_size = hidden_size #输出维度 也就是输出通道self.num_layers = num_layersself.output_size = configs.enc_in #channel #输出个数self.num_directions = 1 # 单向LSTMself.batch_size = configs.batch_sizeself.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)self.linear = nn.Linear(self.hidden_size, self.output_size)#通道数对齐层self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)#输出长度对齐层# self.linear = nn.Linear()#add dce-blockself.dct_layer=dct_channel_block(configs.seq_len)self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)#作为模块一般normal channel效果好点# self.dct_norm = nn.LayerNorm([512], eps=1e-6)#作为模块一般normal channel效果好点def forward(self, x):# x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL# b, c, l = x.size() # (B,C,L)# batch_size, seq_len = x.shape[0], x.shape[1]# h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)# c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)# output(batch_size, seq_len, num_directions * hidden_size)output, _ = self.lstm(x, (h_0, c_0)) # output(5, 30, 64)# print("output.shape:",output.shape)#result.shape: torch.Size([8, 96, 8])result = self.linear(output)  # (B,L,C)# output.shape: torch.Size([8, 96, 16])#16是hidden_size# result.shape: torch.Size([8, 96, 8])#8是为了符合通道数对齐,exchange_rate有8个变量'''dct'''result = self.dct_layer(result.permute(0,2,1))#加入dct模块,mse降低0.12个点result_len =  self.linear_out_len(result)#为了输出长度对齐 (8,8,96)'''dct'''# result_len =  self.linear_out_len(result.permute(0,2,1))#为了输出长度对齐 (8,8,96)# print("result.shape:",result.shape)#result.shape: torch.Size([8, 96, 8])# result = result.permute(0,2,1)#(B,L,C)=》(B,C,L)# result = self.dct_layer(result_len)#加入dct模块,mse降低0.12个点# result = result.permute(0,2,1)#(B,C,L)=》(B,L,C)return  result_len.permute(0,2,1)
# lstm = LSTM(input_size=7,hidden_size=64, output_size=7,batch_size=8,num_layers=5).to(device)
# tensor = torch.rand(8, 96, 7).to(device)
# result = lstm(tensor)
# print("result.shape:",result.shape)
if __name__ == '__main__': parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')# forecasting taskparser.add_argument('--seq_len', type=int, default=96, help='input sequence length')parser.add_argument('--label_len', type=int, default=48, help='start token length')parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')parser.add_argument('--embed_type', type=int, default=0, help='0: default 1: value embedding + temporal embedding + positional embedding 2: value embedding + temporal embedding 3: value embedding + positional embedding 4: value embedding')parser.add_argument('--enc_in', type=int, default=7, help='encoder input size') # DLinear with --individual, use this hyperparameter as the number of channelsparser.add_argument('--dec_in', type=int, default=7, help='decoder input size')parser.add_argument('--c_out', type=int, default=7, help='output size')parser.add_argument('--d_model', type=int, default=512, help='dimension of model')parser.add_argument('--n_heads', type=int, default=8, help='num of heads')parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')# optimizationparser.add_argument('--num_workers', type=int, default=10, help='data loader num workers')parser.add_argument('--itr', type=int, default=2, help='experiments times')parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')parser.add_argument('--patience', type=int, default=3, help='early stopping patience')parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')parser.add_argument('--des', type=str, default='test', help='exp description')parser.add_argument('--loss', type=str, default='mse', help='loss function')parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)# GPUparser.add_argument('--use_gpu', type=bool, default=True, help='use gpu')parser.add_argument('--gpu', type=int, default=0, help='gpu')parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')parser.add_argument('--test_flop', action='store_true', default=False, help='See utils/tools for usage')args = parser.parse_args()model = Model(args)print(parameter_count_table(model))

这段代码实现了一个基于 LSTM(长短期记忆网络)和 DCT(离散余弦变换)的时间序列预测模型,并且可以处理不同的模型配置和超参数。这段代码主要用于时间序列数据的预测任务,具体解释如下:

1. 模型类 Model

class Model(nn.Module):def __init__(self, configs, input_size=None, hidden_size=16, output_size=None, batch_size=None, num_layers=2):super(Model, self).__init__()...
  • configs:通过命令行传递的配置参数(如序列长度、批量大小、隐藏层大小等)。
  • input_size:输入的特征数量(通道数),从 configs.enc_in 中获取。
  • hidden_size:LSTM 隐藏层的大小,默认是 16,即 LSTM 输出的特征维度。
  • num_layers:LSTM 网络的层数,默认为 2
  • output_size:模型的输出特征数,从 configs.enc_in 中获取。

该模型的结构包含:

  • 一个 LSTM 层,用于处理输入的时间序列数据。
  • 两个 线性层linearlinear_out_len),用于通道数和序列长度的对齐。
  • 一个基于 DCT 的增强模块 dct_channel_block,用于提取频域特征。

2. LSTM 和线性层

self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size)  # 通道数对齐
self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)  # 输出长度对齐
  • LSTM 层:输入维度为 input_size,输出维度为 hidden_size,LSTM 层数为 num_layers
  • 线性层 linear:用于将 LSTM 的输出从 hidden_size 映射回输入的特征数量(通道数)。
  • 线性层 linear_out_len:用于将序列长度从 seq_len 映射到预测的序列长度 pred_len

3. DCT 通道块

self.dct_layer = dct_channel_block(configs.seq_len)
self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)
  • dct_channel_block:通过 DCT(离散余弦变换)提取频域信息,并对输入特征进行处理。它有助于改善模型对时间序列中的周期性或频域特征的捕捉能力。
  • dct_norm:标准化层,用于将经过 DCT 处理的特征进行归一化。

4. 前向传播函数 forward

def forward(self, x):...h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)output, _ = self.lstm(x, (h_0, c_0))  # LSTM 前向传播result = self.linear(output)  # 通道数对齐result = self.dct_layer(result.permute(0, 2, 1))  # DCT 模块result_len = self.linear_out_len(result)  # 长度对齐return result_len.permute(0, 2, 1)
  • 初始化隐藏状态和细胞状态h_0c_0 是 LSTM 的初始隐藏状态和细胞状态。
  • LSTM 前向传播:输入 x 经过 LSTM 层,得到 output,并进行通道数对齐。
  • DCT 处理:经过 LSTM 输出后的结果通过 DCT 模块处理,并对频域特征进行操作。
  • 长度对齐:通过 linear_out_len 线性层调整预测序列的长度为 pred_len
  • 输出:返回最终的预测结果,并调整形状。

5. 命令行参数配置

parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
...
args = parser.parse_args()
model = Model(args)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • enc_in:输入特征的通道数。
  • batch_size:批量大小。

通过 argparse 模块,可以通过命令行传入这些参数来控制模型的配置。

6. 模型参数和 FLOPs 计算

from fvcore.nn import FlopCountAnalysis, parameter_count_table  
print(parameter_count_table(model))
  • parameter_count_table:用于计算模型的参数数量。
  • FlopCountAnalysis:用于分析模型的浮点操作(FLOPs)。

这段代码通过 fvcore 库计算模型的参数量,并输出这些信息以便进行模型复杂度的评估。

总结

  • 该模型结合了 LSTMDCT,用于时间序列数据的预测。
  • 通过 DCT 增强模型对频域特征的捕捉能力,有助于提高模型的预测性能。
  • 通过命令行参数,模型的配置可以灵活调整,并且支持多 GPU 训练和 FLOPs 计算。

oneCNN.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dctclass Model(nn.Module):#2022.11.7修改前,这个Model能跑通#CNNdef __init__(self,configs):super(Model, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.conv1 = nn.Conv1d(in_channels=self.channel_num,out_channels=self.channel_num,kernel_size=5,stride=1,padding=2) #输入通道数和输出通道数应该一样# input = torch.randn(32,7,96)# batch_size x text_len x embedding_size -> batch_size x embedding_size x text_lenself.dct_layer=dct_channel_block(self.seq_len)self.linear = nn.Linear(self.seq_len,self.pred_len)# self.dct_norm = nn.LayerNorm([7], eps=1e-6)#作为模块一般normal channel效果好点def forward(self, x):# print("x.shape:",x.shape)x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL and 1-d conv# b, c, l = x.size() # (B,C,L)out = self.conv1(x) #b,c,lout = self.linear(x)#b,c,l# print(out.size())# out  = self.dct_layer(out)#加入dct模块,mse降低0.12个点# out = self.linear(x)#b,c,l# x = x+mid# x = x.permute(0,2,1) # x = self.dct_norm(x) #norm 144# x = x.permute(0,2,1) return  (out).permute(0,2,1)#b,l,c

这段代码定义了一个基于 CNN(卷积神经网络)DCT(离散余弦变换) 的简单时间序列预测模型。模型通过 1D 卷积和 DCT 增强特征提取,结合线性层对输出进行预测。

代码详解

1. 模型初始化
class Model(nn.Module):def __init__(self, configs):super(Model, self).__init__()self.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.conv1 = nn.Conv1d(in_channels=self.channel_num, out_channels=self.channel_num, kernel_size=5, stride=1, padding=2)self.dct_layer = dct_channel_block(self.seq_len)self.linear = nn.Linear(self.seq_len, self.pred_len)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • channel_num:输入的通道数,代表时间序列的特征数量(例如 7 维度的输入表示有 7 个不同的特征)。
网络组件:
  • conv1:一维卷积层,输入和输出通道数保持一致,卷积核大小为 5,步幅为 1,并使用 padding=2 保证输入和输出序列的长度保持不变。

  • dct_layer:通过 dct_channel_block 提取频域特征,用离散余弦变换增强特征。

  • linear:线性层用于将序列长度从 seq_len 映射到预测长度 pred_len,在输出前对时间序列进行长度调整。

2. 前向传播函数 forward
def forward(self, x):x = x.permute(0, 2, 1)  # 转置输入,调整维度顺序 (B, L, C) -> (B, C, L)out = self.conv1(x)  # 应用一维卷积,提取特征 (B, C, L)out = self.linear(x)  # 应用线性层进行序列长度变换return out.permute(0, 2, 1)  # 再次转置,调整维度回到 (B, L, C)
  • 输入:输入 x 的形状为 (B, L, C),即 B 表示批量大小,L 表示序列长度,C 表示输入的特征维度(通道数)。

  • 转置:在进行 1D 卷积前,先通过 x.permute(0, 2, 1) 将输入从形状 (B, L, C) 转为 (B, C, L),使得输入符合 1D 卷积的要求。

  • 卷积:通过 conv1 提取特征,卷积后维度仍然是 (B, C, L)

  • 线性层:将经过卷积后的特征通过线性层 linear,将序列长度从 seq_len 转换为 pred_len

  • 输出:最后将输出再转置回 (B, L, C),作为最终的预测结果。

可选模块(未激活的部分)

# out  = self.dct_layer(out)  # DCT 层:可选的 DCT 增强模块,用于通过离散余弦变换进一步处理特征。
# out = self.linear(x)  # 可选的线性层,可以用于对卷积后输出再进行进一步处理。
# x = self.dct_norm(x)  # 标准化层,用于标准化 DCT 后的特征。

这些部分暂时被注释掉了,表示 DCT 层和标准化层可能是可选的改进点。在某些情况下,通过 DCT 提取频域特征可以提高模型的性能。

3. 总结

  • 卷积操作:1D 卷积用于捕捉输入时间序列的局部特征。
  • DCT 操作:可选的 DCT 层能够将时域信号转换为频域信号,捕捉输入数据中的周期性或频率特征,从而进一步增强模型的表现。
  • 线性映射:线性层负责调整序列的长度,保证输入和输出长度匹配。

这个模型设计相对简单,但由于使用了卷积操作和 DCT 层,因此在处理具有时空依赖性的数据(如时间序列预测)时有一定优势。


Transformer.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.Transformer_EncDec import Decoder, DecoderLayer, Encoder, EncoderLayer, ConvLayer
from layers.SelfAttention_Family import FullAttention, AttentionLayer
from layers.Embed import DataEmbedding,DataEmbedding_wo_pos,DataEmbedding_wo_temp,DataEmbedding_wo_pos_temp
import numpy as np
from layers.dctnet import dct_channel_block,dctclass Model(nn.Module):"""Vanilla Transformer with O(L^2) complexity"""def __init__(self, configs):super(Model, self).__init__()self.pred_len = configs.pred_lenself.output_attention = configs.output_attention# Embeddingif configs.embed_type == 0:self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 1:self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 2:self.enc_embedding = DataEmbedding_wo_pos(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding_wo_pos(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 3:self.enc_embedding = DataEmbedding_wo_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding_wo_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)elif configs.embed_type == 4:self.enc_embedding = DataEmbedding_wo_pos_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,configs.dropout)self.dec_embedding = DataEmbedding_wo_pos_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,configs.dropout)# Encoderself.encoder = Encoder([EncoderLayer(AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout,output_attention=configs.output_attention), configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation) for l in range(configs.e_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model))# Decoderself.decoder = Decoder([DecoderLayer(AttentionLayer(FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation,)for l in range(configs.d_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model),projection=nn.Linear(configs.d_model, configs.c_out, bias=True))self.dct_layer=dct_channel_block(512)self.dct_norm = nn.LayerNorm([512], eps=1e-6)def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):enc_out = self.enc_embedding(x_enc, x_mark_enc)enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)# print("enc_out.shape:",enc_out.shape)#enc_out.shape: torch.Size([32, 96, 512])#加入dct模块# mid  = self.dct_layer(enc_out)# enc_out = enc_out+mid# enc_out = self.dct_norm(enc_out) #norm 144dec_out = self.dec_embedding(x_dec, x_mark_dec)dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)if self.output_attention:return dec_out[:, -self.pred_len:, :], attnselse:return dec_out[:, -self.pred_len:, :]  # [B, L, D]

这段代码实现了一个结合 TransformerDCT(离散余弦变换) 的神经网络模型,主要用于时间序列预测任务。模型结构基于 Transformer 编码器-解码器框架,支持多种嵌入方式,并集成了 DCT 通道块用于增强特征处理。

1. 模型简介

该模型的核心是标准的 Transformer 编码器-解码器结构,具有以下主要模块:

  • 嵌入模块:通过 DataEmbedding 和其变种为输入数据生成嵌入表示。
  • 编码器(Encoder):基于多层自注意力机制的编码器,用于提取输入序列的高维特征。
  • 解码器(Decoder):结合自注意力和交叉注意力的解码器,用于生成预测序列。
  • DCT 模块:用于对编码器输出进行频域增强处理(当前注释掉,作为可选模块)。

2. 嵌入模块

if configs.embed_type == 0:self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq, configs.dropout)self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
  • DataEmbedding:为输入数据生成嵌入。嵌入方式由 configs.embed_type 控制,支持五种不同类型的嵌入:
    • 0:默认的嵌入(值、时间、位置嵌入的组合)。
    • 1:与 embed_type=0 相同。
    • 2:没有位置嵌入。
    • 3:没有时间嵌入。
    • 4:没有时间和位置嵌入。

3. 编码器(Encoder)

self.encoder = Encoder([EncoderLayer(AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=configs.output_attention),configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation) for l in range(configs.e_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model)
)
  • Encoder:由多个 EncoderLayer 组成,每层包括自注意力机制和前馈神经网络。
  • FullAttention:注意力机制,False 表示不使用掩码(即标准自注意力)。
  • LayerNorm:标准化层,用于稳定训练。

4. 解码器(Decoder)

self.decoder = Decoder([DecoderLayer(AttentionLayer(FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),AttentionLayer(FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),configs.d_model, configs.n_heads),configs.d_model,configs.d_ff,dropout=configs.dropout,activation=configs.activation,)for l in range(configs.d_layers)],norm_layer=torch.nn.LayerNorm(configs.d_model),projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
)
  • Decoder:由多个 DecoderLayer 组成,每层结合了自注意力和交叉注意力机制。
  • projection:最后一层是线性层,用于将解码器的输出投影到目标输出维度(configs.c_out)。

5. DCT 通道块

self.dct_layer = dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • dct_channel_block:DCT 模块用于对编码器的输出进行频域变换,但该部分目前被注释掉。
  • dct_norm:LayerNorm 层用于对经过 DCT 的特征进行标准化处理。

6. 前向传播函数

def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):enc_out = self.enc_embedding(x_enc, x_mark_enc)enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)# DCT 模块(可选)# mid  = self.dct_layer(enc_out)# enc_out = enc_out + mid# enc_out = self.dct_norm(enc_out)dec_out = self.dec_embedding(x_dec, x_mark_dec)dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)if self.output_attention:return dec_out[:, -self.pred_len:, :], attnselse:return dec_out[:, -self.pred_len:, :]
  • 输入

    • x_encx_mark_enc 是编码器的输入和时间标记。
    • x_decx_mark_dec 是解码器的输入和时间标记。
    • enc_self_maskdec_self_maskdec_enc_mask 是可选的注意力掩码。
  • 编码器:通过 enc_embeddingencoder 模块处理编码器输入。

  • DCT:注释掉的 DCT 模块可以对编码器输出进行频域处理。

  • 解码器:通过 dec_embeddingdecoder 模块处理解码器输入,并结合编码器的输出进行预测。

  • 输出

    • 如果 output_attentionTrue,返回解码器的输出和注意力矩阵。
    • 否则,只返回解码器的最终预测。

7. 总结

  • 该模型是一个结合 Transformer 和 DCT 的时间序列预测模型。
  • Transformer 编码器-解码器架构使模型能够捕捉复杂的时间依赖关系。
  • DCT 模块作为可选部分,能够增强模型对频域特征的捕捉。
  • 模型支持多种嵌入方式,通过调整 configs.embed_type 来选择不同的嵌入策略。

Linear.py代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dctimport numpy as np
import math
# class Model(nn.Module):SENET for ETTmx#     """
#     Just one Linear layer
#     """
#     def __init__(self,configs,channel=7,ratio=1):
#         super(Model, self).__init__()#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(7,14, bias=False),
#                 nn.Dropout(p=0.1),
#                 nn.ReLU(inplace=True) ,
#                 nn.Linear(14,7, bias=False),
#                 nn.Sigmoid()
#         )
#         self.seq_len = configs.seq_len
#         self.pred_len = configs.pred_len#         self.Linear_More_1 = nn.Linear(self.seq_len,self.pred_len * 2)
#         self.Linear_More_2 = nn.Linear(self.pred_len*2,self.pred_len)
#         self.relu = nn.ReLU()
#         self.gelu = nn.GELU()    #         self.drop = nn.Dropout(p=0.1)
#         # Use this line if you want to visualize the weights
#        
#     def forward(self, x):
#         # x: [Batch, Input length, Channel]
#   
#         x = x.permute(0,2,1) # (B,L,C)->(B,C,L)
#         b, c, l = x.size() # (B,C,L)
#         y = self.avg_pool(x).view(b, c) # (B,C,L) #         # np.save('f_weight.npy', f_weight_np)
# #         # np.save('%d f_weight.npy' %epoch, f_weight_np)
#         # print("y",y.shape)
#         # return (x * y).permute(0,2,1)
#         return (z).permute(0,2,1)class my_Layernorm(nn.Module):"""Special designed layernorm for the seasonal part"""def __init__(self, channels):super(my_Layernorm, self).__init__()self.layernorm = nn.LayerNorm(channels)def forward(self, x):x_hat = self.layernorm(x)bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)return x_hat - bias
class Model(nn.Module):def __init__(self,configs,channel=96,ratio=1):super(Model, self).__init__()# self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovationself.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.fc = nn.Sequential(nn.Linear(channel, channel*2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear( channel*2, channel, bias=False),nn.Sigmoid())self.fc_inverse = nn.Sequential(nn.Linear(channel, channel//2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear( channel//2, channel, bias=False),nn.Sigmoid())# self.fc_plot = nn.Linear(channel, channel, bias=False)self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)self.Linear = nn.Linear(self.seq_len, self.pred_len)self.Linear_1 = nn.Linear(self.seq_len, self.pred_len)# self.dct_norm = nn.LayerNorm([self.channel_num], eps=1e-6)self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)# self.my_layer_norm = nn.LayerNorm([96], eps=1e-6)def forward(self, x):x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forLb, c, l = x.size() # (B,C,L)list = []for i in range(c):#i represent channel freq=dct.dct(x[:,i,:])     #dct# print("freq-shape:",freq.shape)list.append(freq)stack_dct=torch.stack(list,dim=1) stack_dct = torch.tensor(stack_dct)#(B,L,C)stack_dct = self.dct_norm(stack_dct)#matters for trafficf_weight = self.fc(stack_dct)f_weight = self.dct_norm(f_weight)#matters for traffic#visualization for fecam tensorf_weight_cpu = f_weightf_weight_np = f_weight_cpu.cpu().detach().numpy()np.save('f_weight_weather_wf.npy', f_weight_np)# np.save('%d f_weight.npy' %epoch, f_weight_np)# f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic# result = self.Linear(x)#forL# f_weight_np = result.cpu().detach().numpy()# np.save('f_weight.npy', f_weight_np)# x = x.permute(0,2,1)# result = self.Linear((x *(f_weight_inverse)))#forL result = self.Linear((x *(f_weight)))#forLreturn  result.permute(0,2,1)

这段代码实现了一个时间序列预测模型,模型结合了 DCT(离散余弦变换)线性层 来对输入的序列进行处理,并输出预测结果。模型在数据处理中引入了频域变换,并对频域特征进行了加权处理。以下是详细的解释:

1. my_Layernorm

class my_Layernorm(nn.Module):"""Special designed layernorm for the seasonal part"""def __init__(self, channels):super(my_Layernorm, self).__init__()self.layernorm = nn.LayerNorm(channels)def forward(self, x):x_hat = self.layernorm(x)bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)return x_hat - bias
  • 作用:实现了一种特殊的 LayerNorm 层,它对每个样本进行标准化,并减去每个样本的均值,目的是更好地处理序列中具有季节性或周期性的特征。

2. 模型 Model 初始化

class Model(nn.Module):def __init__(self, configs, channel=96, ratio=1):super(Model, self).__init__()self.seq_len = configs.seq_lenself.pred_len = configs.pred_lenself.channel_num = configs.enc_inself.fc = nn.Sequential(nn.Linear(channel, channel*2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear(channel*2, channel, bias=False),nn.Sigmoid())self.fc_inverse = nn.Sequential(nn.Linear(channel, channel//2, bias=False),nn.Dropout(p=0.1),nn.ReLU(inplace=True),nn.Linear(channel//2, channel, bias=False),nn.Sigmoid())self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)self.Linear = nn.Linear(self.seq_len, self.pred_len)self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
  • fcfc_inverse:两个线性层用于调整输入通道的权重。fc 是增大通道维度的网络,fc_inverse 是缩小通道维度的网络,主要用于特征的重整和处理。
  • mid_LinearLinear:分别用于序列长度的映射。mid_Linear 用于中间的特征处理,Linear 将输入的序列长度从 seq_len 映射到 pred_len
  • dct_norm:用于对经过 DCT 处理的频域特征进行标准化,以确保数据分布稳定,减少模型训练的波动。

3. 前向传播 forward

def forward(self, x):x = x.permute(0, 2, 1)  # 调整输入维度 (B, L, C) -> (B, C, L)b, c, l = x.size()  # 获取输入数据的维度 (Batch, Channel, Length)list = []# 对每个通道的输入数据进行 DCTfor i in range(c):freq = dct.dct(x[:, i, :])  # 对第 i 个通道应用 DCTlist.append(freq)# 将各通道的 DCT 结果堆叠在一起stack_dct = torch.stack(list, dim=1)stack_dct = torch.tensor(stack_dct)# 对 DCT 结果进行 LayerNorm 归一化stack_dct = self.dct_norm(stack_dct)# 通过全连接层对 DCT 结果进行处理f_weight = self.fc(stack_dct)f_weight = self.dct_norm(f_weight)# 保存处理后的权重用于可视化f_weight_cpu = f_weightf_weight_np = f_weight_cpu.cpu().detach().numpy()np.save('f_weight_weather_wf.npy', f_weight_np)# 将原始输入和经过处理的 DCT 权重相乘,并通过线性层进行序列预测result = self.Linear(x * f_weight)  # 结合权重后的输入数据进行预测return result.permute(0, 2, 1)  # 调整输出维度回 (B, L, C)
前向传播步骤:
  1. 维度调整:通过 permute 将输入的维度从 (B, L, C) 转换为 (B, C, L),方便后续进行 1D 操作。
  2. DCT 处理:对每个通道的数据应用 DCT(离散余弦变换),捕捉数据的频域特征。
  3. 堆叠 DCT 结果:将每个通道的 DCT 结果堆叠在一起,形成一个新的张量。
  4. 标准化:通过 LayerNorm 对 DCT 结果进行标准化,确保特征的稳定性。
  5. 全连接层处理:通过全连接层调整 DCT 结果的权重,提取出更高层次的频域特征。
  6. 可视化保存:将处理后的权重保存到 .npy 文件中,用于后续的可视化分析。
  7. 预测结果生成:将输入数据和处理后的权重相乘,然后通过线性层输出预测结果。
  8. 维度恢复:最后将输出的维度恢复为 (B, L, C),与输入数据的形状保持一致。

4. 总结

  • DCT 应用:该模型在每个通道上应用了 DCT,能够提取出输入数据中的频域特征,这对于具有周期性或频率特征的时间序列数据非常有效。
  • 全连接层权重调整:通过全连接层对 DCT 特征进行加权处理,有助于模型学习不同频率成分对预测的影响。
  • 标准化:使用 LayerNorm 保证频域特征在不同通道之间的平衡,避免某些通道的特征在模型中过度放大或缩小。

这个模型结合了 DCT 和神经网络的优势,通过频域特征处理提高了对复杂时间序列数据的预测能力,特别适合具有周期性变化的时间序列任务。


 

run_longExp.py代码:

这段代码是一个基于命令行参数的实验框架,用于时间序列预测任务,使用了 Transformer 系列模型(如 Autoformer、Informer、Transformer)。通过命令行参数,可以灵活调整模型、数据集、超参数等配置,执行训练、测试和预测流程。


参考:

用于时间序列预测的频率增强信道注意力机制(Python代码实现)_fecam-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/430458.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DANN GRL

域自适应是指在目标域与源域的数据分布不同但任务相同下的迁移学习&#xff0c;从而将模型在源域上的良好性能迁移到目标域上&#xff0c;极大地缓解目标域标签缺失严重导致模型性能受损的问题。 介绍一篇经典工作 DANN &#xff1a; 模型结构 在训练阶段需要预测如下两个任务…

委托的注册及注销+观察者模式

事件 委托变量如果公开出去&#xff0c;很不安全&#xff0c;外部可以随意调用 所以取消public,封闭它&#xff0c;我们可以自己书写两个方法&#xff0c;供外部注册与注销&#xff0c;委托调用在子方法里调用&#xff0c;这样封装委托变量可以使它更安全&#xff0c;这个就叫…

SpringBoot3核心特性-核心原理

目录 传送门前言一、事件和监听器1、生命周期监听2、事件触发时机 二、自动配置原理1、入门理解1.1、自动配置流程1.2、SPI机制1.3、功能开关 2、进阶理解2.1、 SpringBootApplication2.2、 完整启动加载流程 三、自定义starter1、业务代码2、基本抽取3、使用EnableXxx机制4、完…

JAVA自助高效安全无人台球茶室棋牌室系统小程序源码

​探索“自助高效安全无人台球茶室棋牌室系统”的奇妙之旅 &#x1f3b1;&#x1f375;&#x1f3b2; &#x1f50d; 初见惊艳&#xff1a;未来娱乐新体验 &#x1f50d; 走进这家无人值守的台球茶室棋牌室&#xff0c;第一感觉就像是穿越到了未来&#xff01;没有繁琐的前台登…

如何利用 opencv 进行 ROI(感兴趣)获取和实现 VR(虚拟现实) 演播室的播放

我是从事医疗软件的开发的。 经常需要从拍摄的医疗视频中获取出病理区域。并计算病理区域的周长和面积。 用 opencv 的术语,这就是感兴趣区域的获取。 (因为都是实时视频,所以速度很关键。代码效率很重要) 有时,需要标注出病理区域,并将非病理区域从视频中去除掉。 如果将…

中电金信 :基于开放架构的私有云建设实践

01开放架构私有云诞生背景 随着国产化创新建设的深化&#xff0c;产业侧行业软件持续进行云原生改造&#xff0c;金融机构拥抱云和容器技术&#xff0c;实现数智化转型已是大势所趋。近年&#xff0c;云原生技术以及架构发展速度更是惊人&#xff0c;私有云开始有了新架构、有了…

小柴冲刺软考中级嵌入式系统设计师系列一、计算机系统基础知识(6)可靠性与系统性能评测基础

目录 1、计算机可靠性 串联系统 并联系统 2、计算机系统的性能评价 性能评测的常用方法 基准测试程序 flechazohttps://www.zhihu.com/people/jiu_sheng 小柴冲刺嵌入式系统设计师系列总目录https://blog.csdn.net/qianshang52013/article/details/139975720?spm1001.2…

潮玩宇宙大逃杀宝石游戏搭建开发

潮玩宇宙大逃杀的开发主要涉及以下方面&#xff1a; 1. 游戏概念和设计&#xff1a; 核心概念定义&#xff1a;确定以潮玩为主题的宇宙背景、游戏的基本规则和目标。例如&#xff0c;玩家在宇宙场景中参与大逃杀竞技&#xff0c;目标是成为最后存活的玩家。 玩法模式设计&a…

基于Es和智普AI实现的语义检索

1、什么是语义检索 语义检索是一种利用自然语言处理&#xff08;NLP&#xff09;和人工智能&#xff08;AI&#xff09;技术来理解搜索查询的语义&#xff0c;以提供更准确和相关搜索结果的搜索技术&#xff0c;语义检索是一项突破性的技术&#xff0c;旨在通过深入理解单词和…

如何使用GLib的单向链表GSList

单向链表是一种基础的数据结构&#xff0c;也是一种简单而灵活的数据结构&#xff0c;本文讨论单向链表的基本概念及实现方法&#xff0c;并着重介绍使用GLib的GList实现单向链表的方法及步骤&#xff0c;本文给出了多个实际范例源代码&#xff0c;旨在帮助学习基于GLib编程的读…

uni-app快速入门

目录 一、什么是 uni-app二、快速创建 uni-app 项目1.创建 uni-app2.运行 uni-app 三、uni-app 相对传统 H5 的变化1.网络模型的变化2.文件类型变化3.文件内代码架构的变化4.外部文件引用方式变化5.组件/标签的变化6.js的变化&#xff08;1&#xff09;运行环境从浏览器变成v8引…

Leetcode—329. 矩阵中的最长递增路径【困难】

2024每日刷题&#xff08;165&#xff09; Leetcode—329. 矩阵中的最长递增路径 dfs dp实现代码 class Solution { public:int longestIncreasingPath(vector<vector<int>>& matrix) {// 9 9 4// 6 6 8// 2 1 1// 1 1 2// 2 2 1// 3 4 2int m …

fastadmin 根据选择数据来传参给selectpage输入框

文章目录 js代码php代码&#xff1a;完结 js代码 $(document).on(change,#table .bs-checkbox [type"checkbox"],function(){let url$(#chuancan).attr(data-url)urlurl.split(?)[0]let idsTable.api.selectedids(table)if(ids.length){let u_id[]ids.forEach(eleme…

CSS文档流以及脱离文档流的方法

文档流 文档流是文档中可显示对象在排列时占用的位置/空间。例如&#xff1a;块元素自上而下摆放&#xff0c;内联元素从左到右摆放。&#xff08;文档流中限制非常的多&#xff0c;导致很多页面效果无法实现)。 常见文档流限制 高低不齐&#xff0c;底边对齐 <head>&…

02【Matlab系统辨识】白噪声

1.白噪声与有色噪声 1.1 白噪声(white noise) 系统辨识中所用到的数据通常都含有噪声。从工程实际出发&#xff0c;这种噪声往往可以视为具有有理谱密度的平稳随机过程。白噪声是一种最简单的随机过程&#xff0c;是由一系列不相关的随机变量组成的理想化随机过程。白噪声的数…

构建数据分析模型,及时回传各系统监控监测数据进行分析反馈响应的智慧油站开源了。

AI视频监控平台简介 AI视频监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒&#xff0c;省去繁琐重复的适配流程&#xff0c;实现芯片、算法、应用的全流程组合&#xff0c;从而大大减少企业级应用约95%的开发成本。增…

【深度学习】03-神经网络2-1损失函数

在神经网络中&#xff0c;不同任务类型&#xff08;如多分类、二分类、回归&#xff09;需要使用不同的损失函数来衡量模型预测和真实值之间的差异。选择合适的损失函数对于模型的性能至关重要。 这里的是API 的注意⚠️&#xff0c;但是在真实的公式中&#xff0c;目标值一定是…

面向对象 vs 面向过程

Java 和 C 语言的区别&#xff1a;面向对象 vs 面向过程 在编程世界中&#xff0c;不同的编程语言承载着不同的编程范式。C 语言作为一门经典的面向过程编程语言&#xff0c;注重函数的调用和操作&#xff1b;而Java则是典型的面向对象编程语言&#xff0c;重视对象与类的设计…

科技云报到:以数据“价值三角”为擎,探索数据治理实践路径

科技云报到原创。 过去四十年&#xff0c;经济发展主要来自于土地、劳动力、农业技术、工业技术等要素的充分释放。面向数字经济时代&#xff0c;无论是大模型、自动驾驶还是具身智能、人形机器人&#xff0c;数据已然成为继土地、劳动、资本和技术之后的又一种战略资产和新型…