基于MLP算法实现交通流量预测(Pytorch版)

在海量的城市数据中,交通流量数据无疑是揭示城市运行脉络、洞察出行规律的关键要素之一。实时且精准的交通流量预测不仅能为交通规划者提供科学决策依据,助力提升道路使用效率、缓解交通拥堵,还能为公众出行提供参考,实现个性化导航服务,进而提升整个城市的运行效能。然而,交通流量受到众多因素的交织影响,如天气变化、特殊事件、节假日效应、时段特性等,其动态变化规律呈现出显著的非线性、时变性和不确定性,这无疑给预测工作带来了巨大挑战。

在此背景下,机器学习技术,尤其是深度学习方法,凭借其强大的模型构建能力和对复杂非线性关系的出色捕捉能力,逐渐崭露头角,成为交通流量预测领域的研究热点。其中,多层感知器(MLP)作为一类基础而经典的前馈型人工神经网络,以其简洁的结构、灵活的适应性和良好的泛化性能,在处理高维、非线性问题上展现出独特优势。 MLP通过模拟人脑神经元的工作机制,通过多层非线性变换对输入数据进行深度抽象和特征学习,能够有效挖掘交通流量数据背后的复杂关联与潜在模式,从而实现对未来的流量状态进行精准预测。

模型简介

多层感知器(Multi-Layer Perceptron, MLP)是一种基础且广泛使用的前馈型人工神经网络模型,它是深度学习领域的重要组成部分,尤其在处理非线性关系和模式识别问题中表现出强大的能力。MLP的核心特征在于其多层结构,由多个神经元组成的隐藏层与输入层和输出层相互连接,形成一个分层信息处理系统。下面是对MLP的详细介绍:

基本结构与组件:

  1. 输入层(Input Layer):接收原始数据作为网络的输入。这些数据通常是经过预处理后的数值向量,代表了待解决问题中的各种特征或变量。

  2. 隐藏层(Hidden Layers):位于输入层与输出层之间的中间层。MLP可以有一个或多个隐藏层,每个隐藏层包含多个神经元。隐藏层的主要功能是通过非线性变换对输入数据进行复杂的特征提取和表示学习,从而捕获数据中的潜在关系和模式。神经元之间通常是全连接的,即每个隐藏层神经元接收到前一层所有神经元的加权输出。

  3. 输出层(Output Layer):产生网络的最终输出,根据任务需求可以是一维或多维的。在交通流量预测等回归任务中,输出层通常只有一个神经元,给出连续的预测值;而在分类任务中,输出层神经元数量对应类别数,每个神经元的激活值代表对应类别的概率。

工作原理:

  1. 加权求和与激活函数:每个神经元接收前一层所有神经元的输出(或输入层的原始数据),对这些输入进行加权求和,加上一个偏置项后,通过一个非线性激活函数进行转换。常见的激活函数包括Sigmoid、Tanh、ReLU及其变种等。激活函数引入非线性,使得网络能够表达复杂的非线性关系。

  2. 前向传播(Forward Propagation):信息从输入层依次经过隐藏层直至输出层的过程称为前向传播。在这个过程中,数据通过各层神经元的加权求和和激活函数计算,逐步形成更高级的特征表示,最终在输出层生成预测结果。

  3. 反向传播(Backpropagation):当网络进行预测后,会计算预测结果与实际标签之间的误差(损失函数)。反向传播算法根据这个误差,从输出层开始,逐层反向调整各层神经元的权重和偏置,以最小化总体误差。这是通过链式法则计算梯度并应用梯度下降或其变种算法实现的。反向传播确保了网络在训练过程中能够自我学习并逐步改善其预测性能。

训练与应用:

  1. 训练过程:给定标记好的训练数据集,MLP通过迭代执行前向传播和反向传播,更新模型参数,直到达到预设的停止条件(如达到一定迭代次数、损失函数收敛或验证集性能不再提升等)。训练过程中可能涉及正则化、批量归一化、dropout等技术防止过拟合。

  2. 应用领域:MLP因其灵活性和普适性,被广泛应用于各种领域,如图像识别、语音识别、自然语言处理、时间序列预测(如交通流量预测)、金融风险评估、生物医学信号分析等。在交通流量预测中,MLP可以接收历史流量数据、气象信息、节假日标志等多元输入,学习并建模这些因素与未来流量之间的复杂非线性关系,从而做出准确预测。

综上所述,多层感知器(MLP)作为一种基础的前馈神经网络模型,通过多层非线性变换对输入数据进行抽象和学习,适用于各种非线性预测和分类任务。其训练过程依赖于反向传播算法来优化模型参数,使其能够捕捉数据中的复杂模式,并在诸多实际应用场景中展现出强大的预测性能。在交通流量预测中,MLP能够整合多种影响因素,为交通管理者提供有价值的决策支持。

模型构建

class MLP(nn.Module):def __init__(self, input_size, hidden_sizes, output_size):super(MLP, self).__init__()self.input_size = input_sizeself.hidden_sizes = hidden_sizesself.output_size = output_sizelayers = []prev_size = input_sizefor hidden_size in hidden_sizes:layers.append(nn.Linear(prev_size, hidden_size))layers.append(nn.Sigmoid())prev_size = hidden_sizelayers.append(nn.Linear(prev_size, output_size))self.model = nn.Sequential(*layers)def forward(self, x):# [Batch, Input_len, Node] --> [Batch, Node, Input_len]x = x.permute(0, 2, 1)y = self.model(x)# [Batch, Node, Output_len] --> [Batch, Output_len, Node]y = y.permute(0, 2, 1)return y

其中,input_size代表输入层节点数目,hidden_sizes为隐藏层的节点数目,如[24, 36, 24]则代表有3层隐藏层,隐藏层的节点数目分别为24、36、24,output_size则为输出层节点数目。

一个典型的MLP神经网络如下图所示:

在这里插入图片描述

在时间序列任务中,考虑到数据变化的趋势性,认为未来时间窗的特征和当前时刻的前置时间窗特征相关性较大。

在这里插入图片描述

在单变量的预测场景下,假设时间窗为5分钟,用最近12个时间窗的流量预测未来3个时间窗的流量,则可以将最近12个时间窗的流量作为输入特征,此时输入层的节点数目为12,而需要预测的未来的3个时间窗的流量作为输出特征,此时输出层的节点数目为3。

编写个主函数验证下网络shape变化是否符合预期:

if __name__ == '__main__':parser = argparse.ArgumentParser()parser.add_argument('--window_size', type=int, default=12)parser.add_argument('--horizon', type=int, default=3)parser.add_argument('--hidden_sizes', type=list, default=[24, 36, 24])args = parser.parse_args()model = DNN(input_size=args.window_size, hidden_sizes=args.hidden_sizes, output_size=args.horizon)print(model)x = torch.randn(8, 12, 96)y = model(x)print(y.shape)

输出为:

MLP((model): Sequential((0): Linear(in_features=12, out_features=24, bias=True)(1): Sigmoid()(2): Linear(in_features=24, out_features=36, bias=True)(3): Sigmoid()(4): Linear(in_features=36, out_features=24, bias=True)(5): Sigmoid()(6): Linear(in_features=24, out_features=3, bias=True))
)
torch.Size([8, 3, 96])

输入张量的shape为[batch_size, input_len, node],node代表不同的序列,如果是交通流量预测,代表的是不同路段或交叉口。

输出张量的shape为[batch_size, output_len, node]。

用最近12个时间窗的流量预测未来3个时间窗的流量,输入张量的shape为[8, 12, 96],输出张量的shape为[8, 3, 96],符合预期。

那如果是多变量的预测场景呢?

其实,也很简单,假设时间窗为5分钟,用最近12个时间窗的流量、速度、占有率来预测未来3个时间窗的流量,此时输入特征为最近12个时间窗的流量、速度、占有率,有12*3=36个特征,故输入层的节点数目为36,由于输出还是流量这个单变量,所以输出层的节点数目还是3。

数据输入

本文以PEMS数据集作为算法的训练、验证和测试数据。

PEMS数据集是针对加利福尼亚州不同区域高速公路网络收集的交通数据,数据集可能包含多个传感器站点的数据,每个站点每五分钟记录了特定路段或路口的交通状况,包括但不限于:

  • 流量

单位时间内通过某路段或交叉口的车辆数量,反映道路的使用程度。

  • 速度

车辆在道路上行驶的平均速度,用于评估道路的运行效率和拥堵状况。

  • 占有率

车道被车辆占用的比例,是衡量道路拥挤程度的另一个重要指标。

PEMS03数据,26208*358*3,358个检测器,26208个5分钟时间窗(2012/5/1开始,91天),3个变量分别为流量、速度和占有率。PEMS04数据,16992*307*3,307个检测器,16992个5分钟时间窗(2017/7/1开始,59天),3个变量分别为流量、速度和占有率。PEMS07数据,28224*883*3,883个检测器,28224个5分钟时间窗(2017/5/1开始,98天),3个变量分别为流量、速度和占有率。PEMS08数据,17856*170*3,170个检测器,17856个5分钟时间窗(2012/3/1开始,62天),3个变量分别为流量、速度和占有率。

读取数据后,可以将数据传入自定义的DataSet,以方便后续的训练:

class ForecastDataset(torch_data.Dataset):def __init__(self, df, window_size, horizon, normalize_method=None, norm_statistic=None, interval=1):self.window_size = window_size # 12self.interval = interval  #1self.horizon = horizonself.normalize_method = normalize_methodself.norm_statistic = norm_statisticdf = pd.DataFrame(df)df = df.fillna(method='ffill', limit=len(df)).fillna(method='bfill', limit=len(df)).valuesself.data = dfself.df_length = len(df)self.x_end_idx = self.get_x_end_idx()if normalize_method:self.data, _ = normalized(self.data, normalize_method, norm_statistic)def __getitem__(self, index):hi = self.x_end_idx[index] #12lo = hi - self.window_size #0train_data = self.data[lo: hi] #0:12target_data = self.data[hi:hi + self.horizon] #12:24x = torch.from_numpy(train_data).type(torch.float)y = torch.from_numpy(target_data).type(torch.float)return x, ydef __len__(self):return len(self.x_end_idx)def get_x_end_idx(self):# each element `hi` in `x_index_set` is an upper bound for get training data# training data range: [lo, hi), lo = hi - window_sizex_index_set = range(self.window_size, self.df_length - self.horizon + 1)x_end_idx = [x_index_set[j * self.interval] for j in range((len(x_index_set)) // self.interval)]return x_end_idxdef normalized(data, normalize_method, norm_statistic=None):if normalize_method == 'min_max':if not norm_statistic:norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))scale = norm_statistic['max'] - norm_statistic['min'] + 1e-5data = (data - norm_statistic['min']) / scaledata = np.clip(data, 0.0, 1.0)elif normalize_method == 'z_score':if not norm_statistic:norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))mean = norm_statistic['mean']std = norm_statistic['std']std = [1 if i == 0 else i for i in std]data = (data - mean) / stdnorm_statistic['std'] = stdreturn data, norm_statisticdef de_normalized(data, normalize_method, norm_statistic):if normalize_method == 'min_max':if not norm_statistic:norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))scale = norm_statistic['max'] - norm_statistic['min'] + 1e-8data = data * scale + norm_statistic['min']elif normalize_method == 'z_score':if not norm_statistic:norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))mean = norm_statistic['mean']std = norm_statistic['std']std = [1 if i == 0 else i for i in std]data = data * std + meanreturn data

上述代码定义了一个名为ForecastDataset的类,它是基于torch_data.Dataset的子类,用于处理时间序列预测任务的数据集。同时,还提供了normalizedde_normalized两个函数,分别用于数据的标准化(归一化)和反标准化。

ForecastDataset类

  1. 初始化方法__init__

    • 输入参数:
      • df:原始数据。
      • window_size:滑动窗口大小,用于截取历史数据作为模型训练的输入。
      • horizon:预测步长,即模型需要预测未来多少个时间点或时间窗的数据。
      • normalize_method:数据标准化方法,可选值为'min_max'(最小最大值归一化)或'z_score'(Z-score标准化)。
      • norm_statistic:若已知数据的统计信息(如最大值、最小值、均值、标准差),可直接传入;否则,将根据数据计算这些统计量。
      • interval:采样间隔。
    • 方法内部:
      • 将输入的DataFrame填充缺失值,并转化为NumPy数组。
      • 初始化类的属性:滑动窗口大小、采样间隔、预测步长、标准化方法、统计信息等。
      • 计算数据集长度和训练数据结束索引(x_end_idx),用于后续按索引获取训练数据和目标数据。
      • 如果指定了标准化方法,则对数据进行标准化处理,同时更新统计信息。
  2. __getitem__方法:

    • 输入参数:index,表示数据集中第index个样本的索引。
    • 方法内部:
      • 根据x_end_idx列表计算当前样本的训练数据起始索引lo和结束索引hi
      • 提取训练数据(历史数据)和目标数据(未来数据)。
      • 将提取到的训练数据和目标数据转化为PyTorch张量并设置为浮点类型。
      • 返回包含训练数据和目标数据的元组。
  3. __len__方法:

    • 返回数据集的样本数,即x_end_idx列表的长度。
  4. get_x_end_idx方法:

    • 该方法用于生成一个列表,其中每个元素hi表示一个训练数据结束索引。
    • 计算x_index_set,包含所有可能的训练数据结束索引(满足window_sizedf_length - horizon + 1的范围)。
    • 根据采样间隔intervalx_index_set中选取训练数据结束索引,组成x_end_idx列表并返回。

辅助函数

  1. normalized函数:

    • 输入参数:待标准化数据data、标准化方法normalize_method及统计信息字典norm_statistic(可选)。
    • 函数内部:
      • 根据指定的标准化方法进行数据标准化:
        • 若为'min_max'
          • 若未提供统计信息,计算数据的最大值和最小值,然后进行最小最大值归一化。
          • 确保数据在[0, 1]范围内。
        • 若为'z_score'
          • 若未提供统计信息,计算数据的均值和标准差,然后进行Z-score标准化。
          • 避免除以零错误,当标准差为零时将其置为1。
      • 返回标准化后的数据及更新后的统计信息字典。
  2. de_normalized函数:

    • 输入参数:已标准化数据data、标准化方法normalize_method及统计信息字典norm_statistic
    • 函数内部:
      • 根据指定的标准化方法进行数据反标准化:
        • 若为'min_max'
          • 使用提供的统计信息(最大值、最小值)进行反归一化。
        • 若为'z_score'
          • 使用提供的统计信息(均值、标准差)进行反Z-score标准化。
          • 避免除以零错误,当标准差为零时将其置为1。
      • 返回反标准化后的数据。

综上所述,上述代码实现了一个用于时间序列预测任务的数据集类ForecastDataset,支持滑动窗口、预测步长、数据标准化等功能,并提供了数据标准化与反标准化的辅助函数。

代码中,window_size即为前置时间窗的个数,horizon则为预测时间窗的个数,interval为采样间隔,默认为1。

比如现在我们有100个时间窗的历史数据,window_size为12,horizon为12,interval为1。

则第1个样本为:

x: 第1到第12个时间窗
y: 第13到第24个时间窗

第2个样本为:

x: 第2到第13个时间窗
y: 第14到第25个时间窗

以此类推,总共可构建出(his_num-window_size-horizon+1) // interval=(100-12-12+1)//1=77个样本。

同样的,若interval为2,则第1个样本为:

x: 第1到第12个时间窗
y: 第13到第24个时间窗

第2个样本为:

x: 第3到第14个时间窗
y: 第15到第26个时间窗

其他样本以此类推,可以看出,所谓的interval其实就是相邻样本之间的时间窗个数。

模型训练

为了适配不同的模型和数据集,我们采用类继承的方式来编写模型训练代码。

首先,我们定义一个父类。

class Exp_Basic(object):def __init__(self, args):self.args = argsself.device = self._acquire_device()self.model = self._build_model()def _acquire_device(self):if self.args.use_gpu:os.environ["CUDA_VISIBLE_DEVICES"] = str(self.args.gpu) if not self.args.use_multi_gpu else self.args.devicesdevice = torch.device('cuda:{}'.format(self.args.gpu))print('Use GPU: cuda:{}'.format(self.args.gpu))else:device = torch.device('cpu')print('Use CPU')return devicedef _get_data(self):pass# 创建模型def _build_model(self):raise NotImplementedErrorreturn Nonedef train(self):passdef valid(self):passdef test(self):pass

接着,编写具体的实现类Exp_MLP_PEMS(Exp_Basic)。

数据读取

首先实现_get_data方法来获取数据:

def _get_data(self):data_file = os.path.join('../../data/PEMS', self.args.dataset, self.args.dataset+'.npz')print('data file:',data_file)data = np.load(data_file,allow_pickle=True)data = data['data'][:, :, 0]train_ratio = self.args.train_length / (self.args.train_length + self.args.valid_length + self.args.test_length)valid_ratio = self.args.valid_length / (self.args.train_length + self.args.valid_length + self.args.test_length)train_data = data[:int(train_ratio * len(data))]valid_data = data[int(train_ratio * len(data)):int((train_ratio + valid_ratio) * len(data))]test_data = data[int((train_ratio + valid_ratio) * len(data)):]if len(train_data) == 0:raise Exception('Cannot organize enough training data')if len(valid_data) == 0:raise Exception('Cannot organize enough validation data')if len(test_data) == 0:raise Exception('Cannot organize enough test data')if self.args.normtype == 0:train_mean = np.mean(train_data, axis=0)train_std = np.std(train_data, axis=0)train_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}val_mean = np.mean(valid_data, axis=0)val_std = np.std(valid_data, axis=0)val_normalize_statistic = {"mean": val_mean.tolist(), "std": val_std.tolist()}test_mean = np.mean(test_data, axis=0)test_std = np.std(test_data, axis=0)test_normalize_statistic = {"mean": test_mean.tolist(), "std": test_std.tolist()}elif self.args.normtype == 1:data_mean = np.mean(data, axis=0)data_std = np.std(data, axis=0)train_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}val_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}test_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}else:train_mean = np.mean(train_data, axis=0)train_std = np.std(train_data, axis=0)train_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}val_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}test_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}train_set = ForecastDataset(train_data, window_size=self.args.window_size, horizon=self.args.horizon,normalize_method=self.args.norm_method, norm_statistic=train_normalize_statistic)valid_set = ForecastDataset(valid_data, window_size=self.args.window_size, horizon=self.args.horizon,normalize_method=self.args.norm_method, norm_statistic=val_normalize_statistic)test_set = ForecastDataset(test_data, window_size=self.args.window_size, horizon=self.args.horizon,normalize_method=self.args.norm_method, norm_statistic=test_normalize_statistic)train_loader = DataLoader(train_set, batch_size=self.args.batch_size, drop_last=False, shuffle=True,num_workers=1)valid_loader = DataLoader(valid_set, batch_size=self.args.batch_size, shuffle=False, num_workers=1)test_loader = DataLoader(test_set, batch_size=self.args.batch_size, shuffle=False, num_workers=1)node_cnt = train_data.shape[1]return test_loader, train_loader, valid_loader,node_cnt,test_normalize_statistic,val_normalize_statistic

上述Python代码定义了一个名为_get_data的方法,其目的是根据传入的参数配置来读取PEMS 03/04/07/08数据集中的流量数据,进行预处理(如划分训练集、验证集、测试集,标准化),并创建相应的DataLoader对象。下面是代码的详细解读:

  1. 读取数据

    • 使用os.path.join()函数根据传入的args.dataset参数拼接数据文件路径,数据文件位于../../data/PEMS/{dataset}/{dataset}.npz目录下。
    • 使用np.load()函数加载.npz文件,其中数据以字典形式存储,键为'data'。加载后的数据是一个三维数组,第三维代表不同特征(流量、速度、占有率),这里取第一个特征(流量)作为建模数据。
    • 数据现在是一个形状为(len, node)的二维数组,其中len表示时间维度上的观测点数,node表示不同节点(如道路、路口)的数量。
  2. 划分数据集

    • 根据传入的train_lengthvalid_lengthtest_length参数,按照时间维度将数据划分为训练集、验证集和测试集。
    • 计算训练集、验证集、测试集在时间轴上的起始和结束索引,并分别切分数据。
    • 如果划分后任一数据集的长度为0,抛出异常,表示无法组织足够的数据。
  3. 数据标准化

    • 根据normtype参数选择不同的标准化方法:
      • normtype=0:分别计算训练集、验证集、测试集的均值和标准差,进行独立标准化。
      • normtype=1:使用整个数据集(训练集、验证集、测试集组合)的均值和标准差,对所有数据进行统一标准化。
      • normtype=2:仅使用训练集的均值和标准差,对训练集、验证集、测试集进行标准化。
    • 计算所需统计量(均值和标准差),并将结果存储为字典格式,如{"mean": [mean1, mean2, ...], "std": [std1, std2, ...]}
  4. 创建并返回数据集和DataLoader对象

    • 使用ForecastDataset类(未在提供的代码中定义)创建训练集、验证集、测试集对象。传入原始数据、窗口大小(window_size)、预测时域(horizon)、标准化方法(norm_method)和对应的标准化统计量。
    • 为每个数据集对象创建一个DataLoader,设置批次大小(batch_size)、是否丢弃最后一小批(drop_last)、是否打乱数据(shuffle)、并行处理的worker数(num_workers)等参数。
    • 返回测试集、训练集、验证集的DataLoader对象,以及节点数量(node_cnt)和测试集、验证集的标准化统计量。

总结:该方法完成了PEMS数据集的读取、划分、标准化,并为训练、验证、测试准备了相应的DataLoader对象,为后续模型训练和评估提供了数据支持。

构建模型

def _build_model(self):model = MLP(input_size=self.args.window_size, hidden_sizes=self.args.hidden_sizes, output_size=self.args.horizon)print(model)return model

训练

def train(self):my_optim = self._select_optimizer()my_lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=my_optim, gamma=self.args.decay_rate)test_loader, train_loader, valid_loader, node_cnt, test_normalize_statistic, val_normalize_statistic = self._get_data()forecast_loss = nn.L1Loss()best_validate_mae = np.infbest_test_mae = np.infvalidate_score_non_decrease_count = 0if self.args.resume:self.model, lr, epoch_start = load_model(self.model, self.result_file, model_name=self.args.dataset,horizon=self.args.horizon)else:epoch_start = 0for epoch in range(epoch_start, self.args.epoch):lr = adjust_learning_rate(my_optim, epoch, self.args)epoch_start_time = time.time()self.model.train()loss_total = 0cnt = 0for i, (inputs, target) in enumerate(train_loader):inputs = inputstarget = targetself.model.zero_grad()forecast = self.model(inputs)loss = forecast_loss(forecast, target)cnt += 1loss.backward()my_optim.step()loss_total += float(loss)print('| end of epoch {:3d} | time: {:5.2f}s | train_total_loss {:5.4f} '.format(epoch, (time.time() - epoch_start_time), loss_total / cnt))if (epoch + 1) % self.args.exponential_decay_step == 0:my_lr_scheduler.step()if (epoch + 1) % self.args.validate_freq == 0:is_best_for_now = Falseprint('------ validate on data: VALIDATE ------')valid_metrics = self.validate(self.model, valid_loader, self.args.norm_method,val_normalize_statistic,self.args.window_size, self.args.horizon,test=False)test_metrics = self.validate(self.model, test_loader, self.args.norm_method,test_normalize_statistic,self.args.window_size, self.args.horizon,test=True)if best_validate_mae > valid_metrics['mape']:best_validate_mae = valid_metrics['mape']is_best_for_now = Truevalidate_score_non_decrease_count = 0print('got best validation result:', valid_metrics, test_metrics)else:validate_score_non_decrease_count += 1if best_test_mae > test_metrics['mape']:best_test_mae = test_metrics['mape']print('got best test result:', test_metrics)# save modelif is_best_for_now:save_model(epoch, lr, model=self.model, model_dir=self.result_file, model_name=self.args.dataset,horizon=self.args.horizon)print('saved model!')# early stopif self.args.early_stop and validate_score_non_decrease_count >= self.args.early_stop_step:break

上述代码定义了一个名为train的方法,用于训练一个给定的模型。

  1. 初始化变量与加载模型:

    • 调用_select_optimizer方法选择优化器(optimizer)。
    • 创建指数衰减学习率调度器(learning rate scheduler),使用ExponentialLR类,指定优化器和衰减率(decay rate)。
    • 调用_get_data方法获取训练、验证、测试数据加载器、节点数量、测试数据与验证数据的标准化统计信息。
    • 定义损失函数(loss function)为L1损失(nn.L1Loss)。
    • 设置最佳验证MAE(Mean Absolute Error)和最佳测试MAE初始值为正无穷大。
    • 初始化验证分数非下降计数器(validate score non-decrease count)为0。
  2. 检查是否继续之前训练:

    • 如果args.resume参数为真(即继续之前训练),则加载模型、学习率(lr)和开始的训练轮数(epoch_start)。
    • 否则,设置epoch_start为0,从头开始训练。
  3. 主训练循环:

    • 对于每个训练轮(epoch),从epoch_startargs.epoch
      • 调整学习率(adjust_learning_rate函数)。
      • 记录当前轮开始时间。
      • 将模型设置为训练模式。
      • 初始化累计训练损失(loss_total)和样本计数器(cnt)为0。
      • 遍历训练数据加载器中的样本(enumerate(train_loader)):
        • 输入(inputs)和目标(target)保持不变。
        • 清除模型的梯度。
        • 使用模型对输入进行预测(forecast)。
        • 计算预测与目标之间的损失(loss)。
        • 更新样本计数器和累计训练损失。
        • 反向传播损失并更新模型参数。
      • 打印本训练轮的训练耗时和平均损失。
  4. 学习率调整与验证:

    • 如果当前训练轮数(epoch+1)能被指数衰减步长整除,执行学习率调度器的step方法,降低学习率。
    • 如果当前训练轮数能被验证频率整除,进行验证过程:
      • 验证模型在验证集上的表现,调用validate方法,传入模型、验证数据加载器、标准化方法、验证数据的统计信息、窗口大小、预测步长等参数,并设置test=False
      • 同样地,验证模型在测试集上的表现,此时设置test=True
      • 检查当前验证MAPE(Mean Absolute Percentage Error)是否优于历史最佳验证MAPE:
        • 如果是,则更新最佳验证MAPE、重置验证分数非下降计数器,并记录当前验证和测试结果。
      • 检查当前测试MAPE是否优于历史最佳测试MAPE,如果是,则更新最佳测试MAPE。
      • 若当前验证结果为最佳,保存模型(save_model函数),并打印提示信息。
  5. 早停条件判断:

    • 如果启用了早停(args.early_stop为真)且连续args.early_stop_step个验证周期内验证分数未下降,则跳出训练循环。

对应的_select_optimizer方法为:

def _select_optimizer(self):if self.args.optimizer == 'RMSProp':my_optim = torch.optim.RMSprop(params=self.model.parameters(), lr=self.args.lr, eps=1e-08)else:my_optim = torch.optim.Adam(params=self.model.parameters(), lr=self.args.lr, betas=(0.9, 0.999),weight_decay=self.args.weight_decay)return my_optim

验证

def validate(self, model, dataloader, normalize_method, statistic,window_size, horizon, test=False):if test:print("===================Test Normal=========================")else:print("===================Validate Normal=========================")forecast_norm, target_norm, input_norm = self.inference(model, dataloader, window_size, horizon)if normalize_method and statistic:forecast = de_normalized(forecast_norm, normalize_method, statistic)target = de_normalized(target_norm, normalize_method, statistic)else:forecast, target, input = forecast_norm, target_norm, input_normscore = evaluate(target, forecast)score_final_detail = evaluate(target, forecast, by_step=True)print('by each step: MAPE & MAE & RMSE', score_final_detail)if test:print(f'TEST: RAW : MAE {score[1]:7.2f};MAPE {score[0]:7.2f}; RMSE {score[2]:7.2f}.')else:print(f'VAL: RAW : MAE {score[1]:7.2f};MAPE {score[0]:7.2f}; RMSE {score[2]:7.2f}.')return dict(mae=score[1], mape=score[0], rmse=score[2])

上述代码定义了一个名为validate的方法,用于评估模型在给定数据集上的预测性能。

  1. 判断验证或测试模式:

    • 根据test参数的值(True或False)输出不同的提示信息,表明正在进行的是测试(Test)还是验证(Validate)。
  2. 模型推理与数据标准化恢复:

    • 调用inference方法,传入模型、数据加载器、窗口大小、预测步长,得到模型对数据集的预测结果(forecast_norm)、真实目标值(target_norm)和输入数据(input_norm)。
    • 判断是否进行了数据标准化:
      • 如果指定了标准化方法(normalize_method)且提供了统计信息(statistic):
        • 使用de_normalized函数对预测结果和真实目标值进行反标准化,恢复到原始数值范围。
      • 否则,直接使用标准化后的预测结果、真实目标值和输入数据。
  3. 计算评估指标:

    • 调用evaluate函数,传入真实目标值和模型预测结果,计算各项评估指标(MAE、MAPE、RMSE)。
  4. 输出详细评估结果:

    • 调用evaluate函数,传入额外参数by_step=True,得到按预测步长分解的各项评估指标。
    • 输出按预测步长分解的评估指标(MAPE、MAE、RMSE)。
  5. 打印总体评估结果:

    • 根据test参数的值,打印相应的测试或验证结果标签。
    • 输出总体的MAE、MAPE、RMSE值,保留两位小数。
  6. 返回评估指标字典:

    • 将计算得到的MAE、MAPE、RMSE值封装到一个字典中,以maemapermse为键,对应值为值,返回该字典。

综上,validate方法通过模型推理、数据标准化恢复、计算评估指标、输出结果等步骤,对模型在给定数据集上的预测性能进行评估,并返回评估指标的字典。根据test参数的不同,该方法可用于模型的验证或测试阶段。

测试

def test(self):test_loader, train_loader, valid_loader, node_cnt, test_normalize_statistic, val_normalize_statistic = self._get_data()model, lr, epoch = load_model(self.model, self.result_file, model_name=self.args.dataset, horizon=self.args.horizon)return self.validate(model, test_loader, self.args.norm_method, test_normalize_statistic,self.args.window_size, self.args.horizon, test=True)

实时推理

def inference(self, model, dataloader, window_size, horizon):forecast_set = []target_set = []input_set = []self.model.eval()with torch.no_grad():for i, (inputs, target) in enumerate(dataloader):inputs = inputstarget = targetinput_set.append(inputs.detach().cpu().numpy())step = 0forecast_steps = np.zeros([inputs.size()[0], horizon, inputs.size()[2]], dtype=np.float64)# 适配迭代预测和非迭代预测while step < horizon:forecast_result = model(inputs)len_model_output = forecast_result.size()[1]if len_model_output == 0:raise Exception('Get blank inference result')inputs[:, :window_size - len_model_output, :] = inputs[:, len_model_output:window_size,:].clone()inputs[:, window_size - len_model_output:, :] = forecast_result.clone()forecast_steps[:, step:min(horizon - step, len_model_output) + step, :] = \forecast_result[:, :min(horizon - step, len_model_output), :].detach().cpu().numpy()step += min(horizon - step, len_model_output)forecast_set.append(forecast_steps)target_set.append(target.detach().cpu().numpy())return np.concatenate(forecast_set, axis=0), np.concatenate(target_set, axis=0), np.concatenate(input_set,axis=0)

上述代码定义了一个名为inference的方法,用于执行模型在给定数据集上的推理(预测)。

  1. 初始化变量:

    • 创建三个空列表forecast_settarget_setinput_set,分别用于存储模型预测结果、真实目标值和输入数据。
  2. 设置模型为评估模式并禁用梯度计算:

    • 将模型设为评估模式(self.model.eval()),避免在推理过程中进行不必要的前向传播计算。
    • 使用with torch.no_grad():语句块,确保在该块内的计算不记录梯度,节省内存并提高推理速度。
  3. 遍历数据加载器中的样本:

    • 使用enumerate(dataloader)遍历数据加载器中的每个样本,包括输入数据(inputs)和目标值(target)。
    • 将当前样本的输入数据和目标值分别添加到input_settarget_set列表中。
  4. 进行模型推理:

    • 初始化变量step为0,用于记录当前已预测的时间步数。
    • 初始化forecast_steps数组,用于存储当前样本的所有预测结果,形状为(batch_size, horizon, input_feature_dim),其中batch_size为样本批次大小,horizon为预测步长,input_feature_dim为输入特征维度。
    • 迭代预测逻辑:
      • 使用模型对当前输入数据进行预测,得到forecast_result
      • 检查模型输出长度(len_model_output),若为0则抛出异常。
      • 更新输入数据(inputs):将较旧的历史数据移至左侧,并将最新预测结果移至右侧,以准备下一轮预测。
      • 将模型当前输出的预测结果按需填充到forecast_steps数组中,确保每个样本的预测结果按时间步正确排列。
      • 更新step变量,累加已预测的时间步数。
  5. 处理完所有时间步后,将当前样本的预测结果、真实目标值和输入数据添加到相应列表中。

  6. 合并所有样本的预测结果、真实目标值和输入数据:

    • 使用np.concatenate函数,将forecast_settarget_setinput_set列表中的数据沿第一个维度(样本维度)拼接成一个完整的数组。
  7. 返回结果:

    • 返回合并后的预测结果数组、真实目标值数组和输入数据数组。

综上,inference方法通过遍历数据加载器、执行模型推理并收集预测结果、真实目标值和输入数据,最终返回这些数据的完整数组。该方法支持迭代预测(递归使用模型输出作为下一轮输入的一部分)和非迭代预测(单次模型预测即可得到全部结果),适用于不同类型的预测模型。

模型效果

最后,将我们构建好的MLP网络在PEMS数据集进行了准确性测试,算法测试的相关配置如下:

torch.manual_seed(4321)  # reproducible
parser = argparse.ArgumentParser(description='MLP on pems datasets')
### -------  dataset settings --------------
parser.add_argument('--dataset', type=str, default='PEMS08',choices=['PEMS03', 'PEMS04', 'PEMS07', 'PEMS08'])  # sometimes use: PeMS08
parser.add_argument('--norm_method', type=str, default='z_score')
parser.add_argument('--normtype', type=int, default=0)
### -------  input/output length settings --------------
parser.add_argument('--window_size', type=int, default=12)
parser.add_argument('--horizon', type=int, default=12)
parser.add_argument('--train_length', type=float, default=6)
parser.add_argument('--valid_length', type=float, default=2)
parser.add_argument('--test_length', type=float, default=2)
### -------  training settings --------------
parser.add_argument('--use_gpu', type=bool, default=False)
parser.add_argument('--train', type=bool, default=True)
parser.add_argument('--resume', type=bool, default=False)
parser.add_argument('--evaluate', type=bool, default=False)
parser.add_argument('--finetune', type=bool, default=False)
parser.add_argument('--validate_freq', type=int, default=1)
parser.add_argument('--epoch', type=int, default=80)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--batch_size', type=int, default=8)
parser.add_argument('--optimizer', type=str, default='N')  #
parser.add_argument('--early_stop', type=bool, default=True)
parser.add_argument('--early_stop_step', type=int, default=5)
parser.add_argument('--exponential_decay_step', type=int, default=5)
parser.add_argument('--decay_rate', type=float, default=0.5)
parser.add_argument('--lradj', type=int, default=1, help='adjust learning rate')
parser.add_argument('--weight_decay', type=float, default=1e-5)
parser.add_argument('--model_name', type=str, default='MLP')
### -------  model settings --------------
parser.add_argument('--hidden_sizes', type=list, default=[24, 36, 24])
args = parser.parse_args()

结果如下:

数据集MAEMAPERMSE
PEMS0319.72260.18440331.5506
PEMS0427.24220.16668742.5976
PEMS0728.80020.12248444.1366
PEMS0820.85040.12366832.5307

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/314320.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法 || 二分查找

目录 二分查找 在排序数组中查找元素的第一个和最后一个位置 搜索插入位置 一个数组经过划分后具有二段性的都可以用二分查找 二分查找 704. 二分查找 - 力扣&#xff08;LeetCode&#xff09; ​ 暴力解法&#xff1a;直接遍历数组&#xff0c;找到 target 便返回下标&am…

原型链prototype、__proto、constructor的那些问题整理

再了解原型链之前,我们先来看看构造函数和实例对象的打印结构 - 函数 这里我们定义一个构造函数Fn,然后打印它的结构吧 function Fn(){} console.dir(Fn)控制台得到结构 从上面结构我们能看的出来,函数有两种原型,一种是作为函数特有的原型:prototype,另一种是作为对象的__…

低代码技术的全面应用:加速创新、降低成本

引言 在当今数字化转型的时代&#xff0c;企业和组织面临着不断增长的应用程序需求&#xff0c;以支持其业务运营和创新。然而&#xff0c;传统的软件开发方法通常需要大量的时间、资源和专业技能&#xff0c;限制了企业快速响应市场变化和业务需求的能力。在这样的背景下&…

chrome浏览器安装elasticsearch的head可视化插件

head插件简介 elasticsearch-head被称为是弹性搜索集群的web前端&#xff0c;head插件主要是用来和elastic Cluster交互的Web前端 head插件历史 elasticsearch-head插件在0.x-2.x版本的时候是集成在elasticsearch内的&#xff0c;由elasticsearch的bin/elasticsearch-plugin…

区块链 | OpenSea 相关论文:Toward Achieving Anonymous NFT Trading(一)

​ &#x1f951;原文&#xff1a; Toward Achieving Anonymous NFT Trading &#x1f951;写在前面&#xff1a; 本文对实体的介绍基于论文提出的方案&#xff0c;而非基于 OpenSea 实际采用的方案。 其实右图中的 Alice 也是用了代理的&#xff0c;不过作者没有画出来。 正文…

基于SpringBoot + Vue实现的校园(通知、投票)管理系统设计与实现+毕业论文(12000字)+答辩PPT+指导搭建视频

目录 项目介绍 运行环境 技术栈 效果展示 论文展示 总结 项目介绍 本系统包含管理员、用户、院校管理员三个角色。 管理员角色&#xff1a;用户管理、院校管理、单位类别管理、院校管理员管理、单位管理、通知推送管理、投票信息管理、通知回复管理等。 用户角色&#…

sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?

&#x1f3c6;本文收录于「Bug调优」专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&&…

数据结构—单链表

含义 1.顺序表的回顾 之前的文章已经谈到了顺序表&#xff0c;关于顺序表&#xff0c;有一下的几种特点 1.中间&#xff0c;头部的删除&#xff0c;复杂度为O(N); 2.增容会有申请新的空间&#xff0c;拷贝数据&#xff0c;释放旧的空间&#xff0c;会有不小的消耗&#xff…

数据结构:最小生成树(Prim算法和Kruskal算法)、图的最短路径(Dijkstra算法和Bellman-Ford算法)

什么是最小生成树&#xff1f;Prim算法和Kruskal算法是如何找到最小生成树的&#xff1f; 最小生成树是指在一个连通图中&#xff0c;通过连接所有节点并使得总权重最小的子图。 Prim算法和Kruskal算法是两种常用的算法&#xff0c;用于寻找最小生成树。 Prim算法的步骤如下&…

mysql的多表查询和子查询

多表查询&#xff1a;查询数据时&#xff0c;需要使用多张表来查询 多表查询分类&#xff1a; 1.内连接查询 2.外连接查询 3.子查询 笛卡尔积&#xff1a; create table class (id int primary key auto_increment,name varchar(10) ); create table student (id int primar…

学习STM32第十六天

RTC实时时钟 一、简介 RTC是一个独立的BCD格式定时器&#xff0c;提供一个时钟日历&#xff0c;两个可编程报警中断&#xff0c;一个具有中断功能周期性可编程唤醒标志&#xff0c;RTC和时钟配置系统处于后备区域。 通过两个32位寄存器以BCD格式实现秒、分钟、小时&#xff08…

stable-diffusion-webui安装与使用过程中的遇到的error合集

stable-diffusion-webui1.9.2踩坑安装 1. 安装过程1.1 stable-diffusion-webui1.2 在win11或win10系统安装&#xff0c;需修改两个启动脚本1.2.1 修改webui-user.bat1.2.2 修改webui.bat 1.3 双击 webui-user.bat 启动脚本1.3.1 no module xformers. Processing without on fre…

Grafana系列 | Grafana监控TDengine库数据 |Grafana自定义Dashboard

开始前可以去grafana官网看看dashboard文档 https://grafana.com/docs/grafana/latest/dashboards 本文主要是监控TDengine库数据 目录 一、TDengine介绍二、Grafana监控TDengine数据三、Grafana自定义Dashboard 监控TDengine库数据1、grafana 变量2、添加变量3、配置panel 一…

FSMC读取FPGA的FIFO

一、硬件说明 FSMC配置 单片机的代码如下&#xff1a; #define VALUE_ADDRESS_AD1 (__IO uint16_t *)0x60400000while (1){if(!HAL_GPIO_ReadPin(GPIOF, GPIO_PIN_8)) //数据非空{data *(__IO uint16_t *)VALUE_ADDRESS_AD1;data2 *(__IO uint16_t *)VALUE_ADDRESS_AD1…

【数据库】MongoDB

文章目录 [toc]数据库操作查询数据库切换数据库查询当前数据库删除数据库查询数据库版本 数据集合操作创建数据集合查询数据集合删除数据集合 数据插入插入id重复的数据 数据更新数据更新一条丢失其他字段保留其他字段 数据批量更新 数据删除数据删除一条数据批量删除 数据查询…

Transformer step by step--Positional Embedding 和 Word Embedding

Transformer step by step往期文章&#xff1a; Transformer step by step--层归一化和批量归一化 要把Transformer中的Embedding说清楚&#xff0c;那就要说清楚Positional Embedding和Word Embedding。至于为什么有这两个Embedding&#xff0c;我们不妨看一眼Transformer的…

Hadoop之路

hadoop更适合在liunx环境下运行&#xff0c;会节省后期很多麻烦&#xff0c;而用虚拟器就太占主机内存了&#xff0c;因此后面我们将把hadoop安装到wsl后进行学习,后续学习的环境是Ubuntu-16.04 &#xff08;windows上如何安装wsl&#xff09; 千万强调&#xff0c;有的命令一…

【24年物联网华为杯】赛题分析与初步计划

赛事介绍 官网链接&#xff1a;2024 年全国大学生物联网设计竞赛 (sjtu.edu.cn) 含金量&#xff1a;属于A类赛事 &#xff08;注意&#xff1a;很多搜索结果的序号是按照选入时间排列的&#xff0c;与含金量无关&#xff0c;华为杯是23年选入的&#xff09; Kimi Chat: 全国…

JavaScript创建和填充数组的更多方法

空数组fill()方法创建并填充数组 ● 我们之前创建数组的方式都是手动去创建去一个数据&#xff0c;例如 console.log([1, 2, 3, 4, 5, 6, 7]);● 当然我们也可以使用Array对象来构造数组 console.log([1, 2, 3, 4, 5, 6, 7]); console.log(new Array(1, 2, 3, 4, 5, 6, 7));…

Java毕业设计 基于SpringBoot vue城镇保障性住房管理系统

Java毕业设计 基于SpringBoot vue城镇保障性住房管理系统 SpringBoot 城镇保障性住房管理系统 功能介绍 首页 图片轮播 房源信息 房源详情 申请房源 公示信息 公示详情 登录注册 个人中心 留言反馈 后台管理 登录 个人中心 修改密码 个人信息 用户管理 房屋类型 房源信息管理…