获取数据
先用下面这段代码获取上证指数的历史数据,得到的csv文件数据,为后面训练模型用的
import akshare as ak
import pandas as pd# 获取上证指数历史数据
df = ak.stock_zh_index_daily(symbol="sh000001")# 将数据保存到本地CSV文件
df.to_csv("sh000001.csv", index=False, encoding="utf-8-sig")# 打印数据
print(df)
注意:运行上段代码之前,需要先用下面这个指令安装akshare包:
pip install akshare
运行完上述代码之后,会得到如下一个csv文件:
训练模型
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader# ------------------------
# 1. 加载数据
# ------------------------
data = pd.read_csv('sh000001.csv') # 替换为你的文件路径
data['date'] = pd.to_datetime(data['date'])
data.set_index('date', inplace=True)# 提取收盘价数据
close_prices = data['close'].values.reshape(-1, 1)# ------------------------
# 2. 数据预处理
# ------------------------
# 归一化
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_close = scaler.fit_transform(close_prices)# 创建时间序列数据集(滑动窗口)
def create_dataset(data, window_size=60):X, y = [], []for i in range(len(data) - window_size - 7): # 预测未来7天X.append(data[i:i+window_size])y.append(data[i+window_size:i+window_size+7]) # 输出未来7天的数据return np.array(X), np.array(y)window_size = 60 # 用过去60天的数据预测未来7天
X, y = create_dataset(scaled_close, window_size)# 划分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]# 转换为PyTorch张量
X_train = torch.FloatTensor(X_train)
y_train = torch.FloatTensor(y_train)
X_test = torch.FloatTensor(X_test)
y_test = torch.FloatTensor(y_test)# 自定义Dataset类
class TimeSeriesDataset(Dataset):def __init__(self, X, y):self.X = Xself.y = ydef __len__(self):return len(self.X)def __getitem__(self, idx):return self.X[idx], self.y[idx]# 创建DataLoader
batch_size = 64
train_dataset = TimeSeriesDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)# ------------------------
# 3. 定义LSTM模型
# ------------------------
class LSTMModel(nn.Module):def __init__(self, input_size=1, hidden_size=50, output_size=7):super().__init__()self.hidden_size = hidden_sizeself.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)self.linear = nn.Linear(hidden_size, output_size)def forward(self, x):# LSTM层out, (h_n, c_n) = self.lstm(x)# 仅取最后一个时间步的隐藏状态out = self.linear(out[:, -1, :])return out# 初始化模型
model = LSTMModel(input_size=1, hidden_size=100, output_size=7)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)# ------------------------
# 4. 训练模型
# ------------------------
num_epochs = 50
model.train()for epoch in range(num_epochs):for batch_X, batch_y in train_loader:# 前向传播outputs = model(batch_X)loss = criterion(outputs, batch_y.squeeze())# 反向传播optimizer.zero_grad()loss.backward()optimizer.step()print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()*1000:.6f}')# ------------------------
# 5. 预测与评估
# ------------------------
model.eval()
with torch.no_grad():# 测试集预测test_pred = model(X_test)test_pred = test_pred.numpy()# 反归一化test_pred = scaler.inverse_transform(test_pred.reshape(-1, 1)).reshape(-1, 7)y_test_actual = scaler.inverse_transform(y_test.numpy().reshape(-1, 1)).reshape(-1, 7)# 预测未来7天(使用最新数据)last_window = scaled_close[-window_size:].reshape(1, window_size, 1)last_window_tensor = torch.FloatTensor(last_window)future_pred = model(last_window_tensor).numpy()future_pred = scaler.inverse_transform(future_pred.reshape(-1, 1)).flatten()# ------------------------
# 6. 可视化结果
# ------------------------
# 测试集预测示例(取第一条样本)
plt.figure(figsize=(12, 6))
plt.plot(y_test_actual[0], label='True Future')
plt.plot(test_pred[0], label='Predicted Future')
plt.legend()
plt.title('Test Set Prediction Example')
plt.show()# 未来7天预测
print("未来7天收盘价预测:")
for i, price in enumerate(future_pred):print(f'Day {i+1}: {price:.2f}')
关键步骤解释
数据预处理:
使用 MinMaxScaler 归一化收盘价到 [0, 1]。
创建滑动窗口数据集(用过去 window_size=60 天的数据预测未来7天)。
模型结构:
LSTM层:输入维度为1(单变量时间序列),隐藏层维度为100。
全连接层:将LSTM最后一个时间步的隐藏状态映射到未来7天的输出。
训练与预测:
使用均方误差(MSE)作为损失函数。
训练完成后,对测试集和未来7天进行预测,并反归一化得到实际价格。
未来预测:
使用最新的 window_size=60 天数据生成输入窗口,预测未来7天收盘价。