近来,机器学习得到了长足的发展,并引起了广泛的关注,其中语音和图像识别领域的成果最为显著。本文分析了深度学习模型——堆叠门控循环单元 Stacked GRU 在股市的表现。论文显示,虽然这种技术在自然语言处理、语音识别等其他领域取得了不错的成绩,但在金融时间序列预测上却表现不佳。事实上,金融数据的特点是噪声信号比高,这使得机器学习模型难以找到模式并预测未来价格。
本文通过对 GRU 时间序列预测模型的介绍,探讨Stacked GRU在股市科技股中的表现。本研究文章的结构如下。第一节介绍金融时间序列数据。第二节对金融时间数进行特征工程。第三节是构建模型、定义参数空间、损失函数与优化器。第四节是训练模型。第五节是评估模型与结果可视化。第六部分是预测下一个时间点的收盘价。
GRU 单变量时间序列预测
- 1. 金融时间序列数据
- 1.1 数据预处理
- 1.2 探索性分析(可视化)
- 1.2.1 股票的日收盘价
- 1.2.2 股票的日收益率
- 1.2.3 股票收益率自相关性
- 2. 时间数据特征工程(APPL)
- 2.1 构造序列数据
- 2.2 特征缩放(归一化)
- 2.3 数据集划分(TimeSeriesSplit)
- 2.4 数据集张量(TensorDataset)
- 3. 构建时间序列模型(Stacked GRU)
- 3.1 构建 GRU 模型
- 3.2 定义模型、损失函数与优化器
- 4. 模型训练与可视化
- 5. 模型评估与可视化
- 5.1 均方误差
- 5.2 反归一化
- 5.3 结果可视化
- 6. 模型预测
- 6.1 转换最新时间步收盘价的数组为张量
- 6.2 预测下一个时间点的收盘价格
1. 金融时间序列数据
- 时间连续性:数据按照时间的先后顺序排列,反映了金融市场的动态变化过程。
- 噪声和不确定性:金融市场受到多种复杂因素的影响,因此数据中存在大量噪声和不确定性。
- 非线性和非平稳性:金融时间序列数据通常呈现出明显的非线性和非平稳性特征。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as snsfrom sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import TimeSeriesSplitimport torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torchinfo import summary
from tqdm import tqdm
1.1 数据预处理
函数将标量、数组、Series 或 DataFrame/dict-like 转换为 pandas datetime 对象。
AAPL = pd.read_csv('AAPL.csv')
# Let's convert the data type of timestamp column to datatime format
AAPL['Date'] = pd.to_datetime(AAPL['Date'])
print(type(AAPL['Close'].iloc[0]),type(AAPL['Date'].iloc[0]))# Selecting subset
cond_1 = AAPL['Date'] >= '2021-04-23 00:00:00'
cond_2 = AAPL['Date'] <= '2024-04-23 00:00:00'
AAPL = AAPL[cond_1 & cond_2].set_index('Date')
<class 'numpy.float64'> <class 'str'>
<class 'numpy.float64'> <class 'pandas._libs.tslibs.timestamps.Timestamp'>
(755, 6)
1.2 探索性分析(可视化)
探索性数据分析 E D A EDA EDA 是一种使用视觉技术分析数据的方法。它用于发现趋势和模式,或借助统计摘要和图形表示来检查假设。
1.2.1 股票的日收盘价
# plt.style.available
# 绘制收盘价
plt.figure(figsize=(18, 6))
plt.plot(AAPL['Adj Close'], label='AAPL')# 设置图表标题和轴标签
plt.title('Close Price with Moving Averages')
plt.ylabel('Price $', fontsize=18)# 显示图例
1.2.2 股票的日收益率
股票的日收益率是反映投资者在一天内从股票投资中获得的回报比例。它通常用百分比来表示,计算公式为:日收益率 = (今日收盘价 - 前一日收盘价) / 前一日收盘价 × 100%,这里我们可是使用 .pct_change()
plt.title('Daily Return History')
plt.plot(AAPL['Adj Close'].pct_change(),linestyle='--',marker='*',label='AAPL')
plt.ylabel('Daily Return', fontsize=18)
1.2.3 股票收益率自相关性
AAPL['Returns'] = AAPL['Adj Close'].pct_change()# 使用pandas的autocorr函数计算自相关系数
# 注意:autocorr默认计算的是滞后1的自相关系数,要计算其他滞后的,需要循环或使用其他方法
autocorr_values = [AAPL['Returns'].autocorr(lag=i) for i in range(1, 301)] # 假设我们查看滞后1到300的自相关# 使用matplotlib绘制自相关系数
plt.figure(figsize=(18, 6))
plt.plot(range(1, 301), autocorr_values, linestyle='-.', marker='*')
plt.title('Autocorrelation of Stock Returns')
2. 时间数据特征工程(APPL)
在时间序列分析中,时间窗口通常用于描述在训练模型时考虑的连续时间步 time steps 的数量。这个时间窗口的大小,即 window_size
# 设置时间窗口大小
window_size = 60
2.1 构造序列数据
和 lookback
,前者是要转换成数据集的 NumPy 数组,后者是用作预测下一个时间段的输入变量的前一时间步数,默认设为 1。
# 构造序列数据函数
def create_dataset(dataset, lookback=1):"""Transform a time series into a prediction datasetArgs:dataset: A numpy array of time series, first dimension is the time stepslookback: Size of window for prediction"""X, y = [], []for i in range(len(dataset)-lookback): feature = dataset[i:(i+lookback), 0]target = dataset[i + lookback, 0]X.append(feature)y.append(target)return np.array(X), np.array(y)
2.2 特征缩放(归一化)
函数主要用于将特征数据按比例缩放到指定的范围。默认情况下,它将数据缩放到[0, 1]区间内,但也可以通过参数设置将数据缩放到其他范围。在机器学习中,MinMaxScaler()
# 选取AAPL['Close']作为特征, 归一化数据
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(AAPL['Close'].values.reshape(-1, 1))
# 创建数据集
X, y = create_dataset(scaled_data, lookback=window_size)
# 重塑输入数据为[samples, time steps, features]
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
2.3 数据集划分(TimeSeriesSplit)
函数与传统的交叉验证方法不同,TimeSeriesSplit 特别适用于需要考虑时间顺序的数据集,因为它确保测试集中的所有数据点都在训练集数据点之后,并且可以分割多个训练集和测试集。
# 使用TimeSeriesSplit划分数据集,根据需要调整n_splits
tscv = TimeSeriesSplit(n_splits=3, test_size=90)
# 遍历所有划分进行交叉验证
for i, (train_index, test_index) in enumerate(tscv.split(X)):X_train, X_test = X[train_index], X[test_index]y_train, y_test = y[train_index], y[test_index]# print(f"Fold {i}:")# print(f" Train: index={train_index}")# print(f" Test: index={test_index}")# 查看最后一个 fold 数据帧的维度
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)
(605, 60, 1) (90, 60, 1) (605,) (90,)
2.4 数据集张量(TensorDataset)
# 将 NumPy数组转换为 tensor张量
X_train_tensor = torch.from_numpy(X_train).type(torch.Tensor)
X_test_tensor = torch.from_numpy(X_test).type(torch.Tensor)
y_train_tensor = torch.from_numpy(y_train).type(torch.Tensor).view(-1,1)
y_test_tensor = torch.from_numpy(y_test).type(torch.Tensor).view(-1,1)print(X_train_tensor.shape, X_test_tensor.shape, y_train_tensor.shape, y_test_tensor.shape)
函数用于重塑张量对象,它等同于 NumPy 中的 reshape()
函数,允许我们重组数据,以匹配 LSTM 模型所需的输入形状。以这种方式重塑数据可确保 LSTM 模型以预期格式接收数据。
torch.Size([605, 60, 1]) torch.Size([90, 60, 1]) torch.Size([605, 1]) torch.Size([90, 1])
使用 TensorDataset
和 DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
3. 构建时间序列模型(Stacked GRU)
GRU (Gated Recurrent Unit)是一种循环神经网络 R N N RNN RNN的变体,用于处理和预测序列数据。与标准RNN相比,GRU能够更有效地捕捉长期依赖关系,并且在训练时更不容易出现梯度消失或梯度爆炸的问题。
🔗 PyTorch所提供的数学公式及解释如下:
Apply a multi-layer gated recurrent unit (GRU) RNN to an input sequence. For each element in the input sequence, each layer computes the following function:
r t = σ ( W i r x t + b i r + W h r h ( t − 1 ) + b h r ) z t = σ ( W i z x t + b i z + W h z h ( t − 1 ) + b h z ) n t = tanh ( W i n x t + b i n + r t ⊙ ( W h n h ( t − 1 ) + b h n ) ) h t = ( 1 − z t ) ⊙ n t + z t ⊙ h ( t − 1 ) \begin{array}{ll} r_t = \sigma(W_{ir} x_t + b_{ir} + W_{hr} h_{(t-1)} + b_{hr}) \\ z_t = \sigma(W_{iz} x_t + b_{iz} + W_{hz} h_{(t-1)} + b_{hz}) \\ n_t = \tanh(W_{in} x_t + b_{in} + r_t \odot (W_{hn} h_{(t-1)}+ b_{hn})) \\ h_t = (1 - z_t) \odot n_t + z_t \odot h_{(t-1)} \end{array} rt=σ(Wirxt+bir+Whrh(t−1)+bhr)zt=σ(Wizxt+biz+Whzh(t−1)+bhz)nt=tanh(Winxt+bin+rt⊙(Whnh(t−1)+bhn))ht=(1−zt)⊙nt+zt⊙h(t−1)
where h t h_t ht is the hidden state at time t t t, x t x_t xt is the input at time t t t, h ( t − 1 ) h_{(t-1)} h(t−1) is the hidden state of the layer at time t − 1 t-1 t−1 or the initial hidden state at time 0 0 0, and r t r_t rt, z t z_t zt, n t n_t nt are the reset, update, and new gates, respectively. σ \sigma σ is the sigmoid function, and ⊙ \odot ⊙ is the Hadamard product.
In a multilayer GRU, the input x t ( l ) x^{(l)}_t xt(l) of the l l l -th layer ( l ≥ 2 l \ge 2 l≥2) is the hidden state h t ( l − 1 ) h^{(l-1)}_t ht(l−1) of the previous layer multiplied by dropout δ t ( l − 1 ) \delta^{(l-1)}_t δt(l−1) where each δ t ( l − 1 ) \delta^{(l-1)}_t δt(l−1) is a Bernoulli random variable which is 0 0 0 with probability d r o p o u t dropout dropout.
3.1 构建 GRU 模型
class GRUNet(nn.Module):def __init__(self, input_dim, hidden_dim, output_dim=1, num_layers=2):# input_dim 是输入特征的维度,hidden_dim 是隐藏层神经单元维度或称为隐藏状态的大小,output_dim 是输出维度,# num_layers 是网络层数,设置 num_layers=2 表示将两个 GRU 堆叠在一起形成一个堆叠 GRU,第二个 GRU 接收第一个 GRU 的输出并计算最终结果super(GRUNet, self).__init__()# 通过调用 super(GRUNet, self).__init__() 初始化父类 nn.Moduleself.hidden_dim = hidden_dimself.num_layers = num_layersself.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)# 定义 GRU 层,使用 batch_first=True 表示输入数据的形状是 [batch_size, seq_len(time_steps), input_dim]self.fc = nn.Linear(hidden_dim, output_dim)# 定义全连接层,将 GRU 的最后一个隐藏状态映射到输出维度 output_dimdef forward(self, x):h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)# 初始化h0为全零张量,h0代表隐藏状态(hidden state)的初始值,形状为 [num_layers * num_directions, batch, hidden_dim]# 如果没有指定双向参数 bidirectional为 True,num_directions 默认为 1out, _ = self.gru(x, h0)# 将输入数据 x 和初始隐藏状态 h0 传递给 GRU层,得到输出 out(即所有时间步的输出)和最后一个时间步的隐藏状态 hn(这里用 _忽略)out = self.fc(out[:, -1, :])# GRU 的输出是一个三维张量,其形状是 [batch_size, seq_len(time_steps), hidden_dim],# 这里我们只取最后一个时间步的隐藏状态 out[:, -1, :] 并传递给全连接层。return out
3.2 定义模型、损失函数与优化器
要在 PyTorch 中构建堆叠 GRU,我们需要调用 GRUNet 类,通过输入 num_layers
model = GRUNet(input_dim=1, # 输入数据的特征数量 X_train.shape[2]hidden_dim=64,output_dim=1,num_layers=2) # 表示将两个 GRU 堆叠在一起形成一个堆叠 GRU
criterion = torch.nn.MSELoss() # 定义均方误差损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.01) # 定义优化器
summary(model, (32, 60, 1)) # batch_size, seq_len(time_steps), input_dim
Layer (type:depth-idx) Output Shape Param #
GRUNet [32, 1] --
├─GRU: 1-1 [32, 60, 64] 37,824
├─Linear: 1-2 [32, 1] 65
Total params: 37,889
Trainable params: 37,889
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 72.62
Input size (MB): 0.01
Forward/backward pass size (MB): 0.98
Params size (MB): 0.15
Estimated Total Size (MB): 1.14
4. 模型训练与可视化
train_loss = []
num_epochs = 20for epoch in range(num_epochs):model.train() # 初始化训练进程pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")for batch_idx, (data, target) in enumerate(pbar):# 前向传播outputs = model(data) # 每个批次的预测值loss = criterion(outputs, target)# 反向传播和优化optimizer.zero_grad()loss.backward()optimizer.step()# 记录损失值train_loss.append(loss.item())# 更新进度条pbar.update()# 这里只用于显示当前批次的损失,不是平均损失pbar.set_postfix({'Train loss': f'{loss.item():.4f}'})
这里我们使用 tqdm
Epoch 1/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.25it/s, Train loss=0.0211]
Epoch 2/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.07it/s, Train loss=0.0052]
Epoch 3/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.24it/s, Train loss=0.0030]
Epoch 4/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.06it/s, Train loss=0.0014]
Epoch 5/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.19it/s, Train loss=0.0011]
Epoch 6/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.21it/s, Train loss=0.0007]
Epoch 7/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.21it/s, Train loss=0.0015]
Epoch 8/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.22it/s, Train loss=0.0015]
Epoch 9/20: 100%|███████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.21it/s, Train loss=0.0012]
Epoch 10/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.07it/s, Train loss=0.0010]
Epoch 11/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.07it/s, Train loss=0.0014]
Epoch 12/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.19it/s, Train loss=0.0011]
Epoch 13/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.21it/s, Train loss=0.0013]
Epoch 14/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.07it/s, Train loss=0.0013]
Epoch 15/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.03it/s, Train loss=0.0020]
Epoch 16/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.08it/s, Train loss=0.0012]
Epoch 17/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.07it/s, Train loss=0.0009]
Epoch 18/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.22it/s, Train loss=0.0014]
Epoch 19/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.07it/s, Train loss=0.0019]
Epoch 20/20: 100%|██████████████████████████████████████████████████| 19/19 [00:01<00:00, 13.11it/s, Train loss=0.0013]
5. 模型评估与可视化
5.1 均方误差
model.eval() # 将模型设置为评估模式
test_loss = [] # 初始化损失
pbar = tqdm(test_loader, desc="Evaluating")
with torch.no_grad():for data, target in pbar:test_pred = model(data)loss = criterion(test_pred, target)test_loss.append(loss.item())# 计算当前批次的平均损失batch_avg_loss = sum(test_loss)/len(test_loss)pbar.set_postfix({'Test Loss': f'{batch_avg_loss:.4f}'})pbar.update() # 更新进度条pbar.close() # 关闭进度条
Evaluating: 100%|██████████████████████████████████████████████████████| 3/3 [00:00<00:00, 51.30it/s, Test Loss=0.0011]
5.2 反归一化
# 反归一化预测结果
train_pred = scaler.inverse_transform(model(X_train_tensor).detach().numpy())
y_train = scaler.inverse_transform(y_train_tensor.detach().numpy())
test_pred = scaler.inverse_transform(model(X_test_tensor).detach().numpy())
y_test = scaler.inverse_transform(y_test_tensor.detach().numpy())print(train_pred.shape, y_train.shape, test_pred.shape, y_test.shape)
(605, 1) (605, 1) (90, 1) (90, 1)
5.3 结果可视化
# shift train predictions for plotting
trainPredict = AAPL[window_size:X_train.shape[0]+X_train.shape[1]]
trainPredictPlot = trainPredict.assign(TrainPrediction=train_pred)testPredict = AAPL[X_train.shape[0]+X_train.shape[1]:]
testPredictPlot = testPredict.assign(TestPrediction=test_pred)
# Visualize the data
plt.title('GRU Close Price Validation')
plt.plot(AAPL['Close'], color='blue', label='original')
plt.plot(trainPredictPlot['TrainPrediction'], color='orange',label='Train Prediction')
plt.plot(testPredictPlot['TestPrediction'], color='red', label='Test Prediction')
6. 模型预测
6.1 转换最新时间步收盘价的数组为张量
# 假设latest_closes是一个包含最新window_size个收盘价的列表或数组
latest_closes = AAPL['Close'][-window_size:].values
latest_closes = latest_closes.reshape(-1, 1)
scaled_latest_closes = scaler.fit_transform(latest_closes)
tensor_latest_closes = torch.from_numpy(scaled_latest_closes).type(torch.Tensor).view(1, window_size, 1)
torch.Size([1, 60, 1])
6.2 预测下一个时间点的收盘价格
# 使用模型预测下一个时间点的收盘价
next_close_pred = model(tensor_latest_closes)
next_close_pred = scaler.inverse_transform(next_close_pred.detach().numpy())
array([[166.83992]], dtype=float32)