文章目录
- Transformer类
- 数据集类
- 训练函数
- 测试函数
- 画图
- 计算指标
- 读取数据
- 计时
- 开始训练
数据集来源: https://github.com/zhouhaoyi/ETDataset
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import math
import time
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from matplotlib_inline import backend_inline
backend_inline.set_matplotlib_formats('svg')
Transformer类
只使用encoder,然后把encoder的输出展平,后接线性层进行输出,理解为encoder只是把原始特征进行变换。
class PositionalEncoding(nn.Module):def __init__(self, d_model, max_len=5000):super(PositionalEncoding, self).__init__()pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0).transpose(0, 1)self.register_buffer('pe', pe)def forward(self, x):return x + self.pe[:x.size(0), :] # [seq_length, batch_size, d_model]class TransformerTimeSeriesModel(nn.Module):def __init__(self, input_size, output_size, seq_length, label_length, d_model = 256, nhead = 8, num_layers = 2, dropout=0.5):'''input_size, output_size, seq_length, label_length分别为输入维度、输出维度、历史时刻步数、多步预测步数'''super(TransformerTimeSeriesModel, self).__init__()self.src_mask = Noneself.embedding = nn.Linear(input_size, d_model)self.pos_coding = PositionalEncoding(d_model)self.encoder_layer = nn.TransformerEncoderLayer(d_model = d_model, nhead = nhead, dim_feedforward=4 * d_model, dropout = dropout)self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers = num_layers)self.fc1 = nn.Linear(seq_length * d_model, label_length * d_model)self.fc2 = nn.Linear(label_length * d_model, label_length * output_size)self.init_weights()def forward(self, src):if self.src_mask is None:device = src.devicemask = self._generate_square_subsequent_mask(len(src)).to(device)self.src_mask = masksrc = self.embedding(src)src = self.pos_coding(src)en_output = self.transformer_encoder(src, self.src_mask) # [seq_length, batch_size, d_model]en_output = en_output.view(en_output.size(1), -1) # [batch_size, seq_length * d_model]output = self.fc1(en_output) # [batch_size, label_length * d_model]output = self.fc2(output) # [batch_size, label_length * output_size]return output.view(-1, label_length, output_size) # [batch_size, label_length, output_size]def init_weights(self):initrange = 0.1for module in self.modules():if isinstance(module, nn.Linear):module.weight.data.uniform_(-initrange, initrange)if module.bias is not None:module.bias.data.zero_()def _generate_square_subsequent_mask(self, sz):mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))return mask
数据集类
class get_dataset(Dataset):def __init__(self, data_path, seq_length, label_length, features, train_split, mode):self.mode = modeself.data_path = data_pathself.features = featuresself.seq_length = seq_lengthself.label_length = label_lengthself.data, self.data_mean, self.data_std = self.get_data()print('self.data.shape:',self.data.shape)# print(self.data[0, :self.seq_length, :].shape) # torch.Size([96, 7])# print(self.data[0, -self.label_length:, -1].shape) # torch.Size([24])train_num = int(train_split * len(self.data))if self.mode == 'train':print('train_samples_num:',train_num)self.data = self.data[:train_num, :, :]else:print('test_samples_num:',len(self.data)-train_num)self.data = self.data[train_num:, :, :]def __len__(self):return len(self.data)def __getitem__(self, index):en_input = self.data[index, :self.seq_length, :]label = self.data[index, -self.label_length:, -1].unsqueeze(1)return en_input, labeldef get_data(self):data = pd.read_csv(self.data_path)data.index = pd.to_datetime(data['date'])data = data.drop('date', axis=1)data_mean = data.mean()data_std = data.std()data = (data - data_mean) / data_stdnum_sample = len(data) - self.seq_length - self.label_length + 1print('len(data):', len(data), 'num_sample:', num_sample)print('len(self.features):',len(self.features))seq_data = torch.zeros(num_sample, self.seq_length + self.label_length, len(self.features))# print(data.iloc[0:0 + self.seq_length + 1, [*range(len(self.features))]].values)for i in range(num_sample):seq_data[i] = torch.tensor(data.iloc[i:i + self.seq_length + self.label_length,[*range(len(self.features))]].values)return seq_data, data_mean, data_std
训练函数
def train(model, dataset, epochs, optim, loss_function, batch_size, shuffle=True):print('training on :', device)data_loader = DataLoader(dataset, batch_size = batch_size, shuffle=shuffle)val_i = np.random.randint(0, 3400, 5)for epoch in range(epochs):train_loss = 0model.train()for x, label in data_loader:# print(x.shape, label.shape) # torch.Size([32, 96, 7]) torch.Size([32, 24, 1])x, label = x.permute(1,0,2).to(device), label.to(device)pred = model(x) # torch.Size([32, 24, 1])loss = loss_function(pred, label)optim.zero_grad()loss.backward()optim.step()train_loss += loss.item()train_loss /= len(data_loader)print(f'[{timeSince(start)}] ', end='')print('epoch: %d, lr: %.8f, train loss : %.8f' % (epoch + 1, scheduler.get_last_lr()[0],train_loss), end='')scheduler.step() pred_array, true_array = test(model, dataset_test, batch_size, shuffle=False)for i in val_i:print(f'验证集上第{i+1}个样本的MAE:', calculate_mae(pred_array, true_array, i=i))draw_one_sample(pred_array, true_array, i = i)
测试函数
def test(model, dataset, batch_size, shuffle = False):model.eval()val_loss = 0.data_loader = DataLoader(dataset, batch_size, shuffle = shuffle)# print(dataset.data_mean,dataset.data_std)pred_list = []true_list = []for x, label in data_loader:# print(x.shape, label.shape) # torch.Size([32, 96, 7]) torch.Size([32, 24, 1])x, label = x.permute(1,0,2).to(device), label.to(device)pred = model(x) # torch.Size([32, 24, 1])loss = loss_function(pred, label)val_loss += loss.item()pred = pred.squeeze(2).detach().cpu().numpy()true = label.squeeze(2).detach().cpu().numpy()# print(pred.shape, true.shape) # (32, 24) (32, 24)pred = pred * dataset.data_std['OT'] + dataset.data_mean['OT']true = true * dataset.data_std['OT'] + dataset.data_mean['OT']x_true = x.permute(1,0,2)[:, :, -1].detach().cpu().numpy() # (32, 96)x_true = x_true * dataset.data_std['OT'] + dataset.data_mean['OT']combined = np.concatenate((x_true, true), axis=1) # (32, 120)pred_list.append(pred)true_list.append(combined)# print(len(pred_list)) # 109 # 可知109 * 32 = 3488 > 3461(验证集的数量),即最后一个batch的样本数不足32pred_array = np.vstack(pred_list) # 形状为 (验证集的数量, 24)true_array = np.vstack(true_list) # 形状为 (验证集的数量, 120)print(' val loss : %.8f' % (val_loss/len(data_loader)))return pred_array, true_array
画图
def draw_one_sample(pred_array, true_array, i=0):'''绘制验证集上第i个样本的预测结果图pred_array.shape: (验证集的数量, 24) 24为多步预测的步数true_array.shape: (验证集的数量, 120) 120为真实数据,前96步为历史数据,后24步为目标真实值'''pred = pred_array[i] # shape: (24,)true = true_array[i] # shape: (120,)historical_true = true[:seq_length] # 前96步为历史数据target_true = true[seq_length:] # 后24步为目标真实值plt.figure(figsize=(8, 4))plt.plot(historical_true, label='Historical Data', color='blue', marker='o', markersize=3)plt.plot(range(seq_length, seq_length+label_length), target_true, label='Target True Values', color='green', marker='o', markersize=3)plt.plot(range(seq_length, seq_length+label_length), pred, label='Predicted Values', color='red', linestyle='--', marker='x', markersize=3)plt.legend()plt.title(f'Prediction vs True Values for Sample {i+1}')plt.xlabel('Time Steps')plt.ylabel('Values')plt.grid()plt.show()
计算指标
def calculate_mae(pred_array, true_array,i):"""计算平均绝对误差 (MAE)参数:pred_array: np.ndarray,预测值数组,形状为 (验证集的数量, 24)true_array: np.ndarray,真实值数组,形状为 (验证集的数量, 120)返回:mae: float,平均绝对误差"""# 取出真实值中的目标部分(即后 24 个时间步)true_values = true_array[i, -label_length:]mae = np.mean(np.abs(pred_array[i,:] - true_values))return mae
读取数据
seed = 0
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(seed)
if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = Trueseq_length = 96
label_length = 24
features = ['HUFL','HULL','MUFL','MULL','LUFL','LULL','OT'] # 'OT'为标签列,其历史数据可以帮助预测其未来值,故也可以当作特征
input_size = len(features) # 7
output_size = 1
train_split = 0.8
data_path = '/kaggle/input/example-dataset/ETTh1.csv'dataset_train = get_dataset(data_path, seq_length, label_length, features, train_split = train_split, mode = 'train')
dataset_test = get_dataset(data_path, seq_length, label_length, features, train_split = train_split, mode = 'test')
len(data): 17420 num_sample: 17301
len(self.features): 7
self.data.shape: torch.Size([17301, 120, 7])
train_samples_num: 13840
len(data): 17420 num_sample: 17301
len(self.features): 7
self.data.shape: torch.Size([17301, 120, 7])
test_samples_num: 3461
计时
def timeSince(since):now = time.time()s = now - sincem = math.floor(s / 60) # math.floor()向下取整s -= m * 60return '%dmin %ds' % (m, s)
开始训练
epochs = 24
lr = 0.001
batch_size = 32
d_model = 20
nhead = 1
num_layers = 1
dropout = 0.1
model = TransformerTimeSeriesModel(input_size, output_size, seq_length, label_length, d_model, nhead,num_layers, dropout = dropout).to(device)optim = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optim, 1.0, gamma=0.98)
loss_function = nn.MSELoss()
start = time.time()
train(model, dataset_train, epochs, optim, loss_function, batch_size, shuffle = True)
# torch.save(model.state_dict(), 'transformer.pth')# pred_array, true_array = test(model, dataset_test, batch_size, shuffle=False)
[1min 33s] epoch: 24, lr: 0.00062835, train loss : 0.65996791 val loss : 0.26256496
验证集上第2733个样本的MAE: 0.7049041
验证集上第2608个样本的MAE: 1.7675642
验证集上第1654个样本的MAE: 1.8673252
验证集上第3265个样本的MAE: 0.9914896
验证集上第836个样本的MAE: 2.9960492
感觉在ETTh1数据集上使用transformer效果一般。
最后,今天在知乎上看到一篇文章挺好的:
「万字长文」长序列预测 & 时空预测,你是否被这些问题困扰过?一文带你探索多元时间序列预测的研究进展!