注:
本文章仅用于算法&工具学习与使用,不具有任何投资建议。
前言:
笔者最近在研究时序趋势预测,而股票天然具有时序性,故使用某一股票数据进行学习。下面将使用代码预测一只股票的每日最高价,当然也可以用于每日开盘价、最低价、收盘价等价格。
1、准备:
1.1 数据源:tushare
我们做股票数据分析的时候,经常遇到没有数据源。但是通过tushare很方便,只需要简单的注册,然后就可以调用tushare pro的接口。
注册地址:https://tushare.pro/register?reg=491338
具体使用方式可以参考下文链接:
https://tushare.pro/document/1?doc_id=37
1.2 算法知识:lstm
https://www.jianshu.com/p/9dc9f41f0b29
https://colah.github.io/posts/2015-08-Understanding-LSTMs/
下面展示通过历史数据,预测下一日股票最高值。
直接上代码,只需要把代码中的token替换为你自己账号的token即可:
2、代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : lstm_trend.py
import datetimeimport tushare as ts
import numpy as np
import torch
from torch import nnimport matplotlib.pyplot as pltdef run_train_lstm():inp_dim = 4out_dim = 1mid_dim = 8mid_layers = 1batch_size = 12 * 12mod_dir = '.''''load data'''data, mean, std = load_data()sink_mean_std('mean_std.txt', [mean, std])data_x = data[:-1, :]data_y = data[+1:, -1]assert data_x.shape[1] == inp_dimtrain_size = int(len(data_x) * 0.75)train_x = data_x[:train_size]train_y = data_y[:train_size]train_x = train_x.reshape((train_size, inp_dim))train_y = train_y.reshape((train_size, out_dim))'''build model'''device = torch.device("cuda" if torch.cuda.is_available() else "cpu")net = RegLSTM(inp_dim, out_dim, mid_dim, mid_layers).to(device)criterion = nn.MSELoss()optimizer = torch.optim.Adam(net.parameters(), lr=0.6e-2)'''train'''var_x = torch.tensor(train_x, dtype=torch.float32, device=device)var_y = torch.tensor(train_y, dtype=torch.float32, device=device)batch_var_x = list()batch_var_y = list()for i in range(batch_size):j = train_size - ibatch_var_x.append(var_x[j:])batch_var_y.append(var_y[j:])from torch.nn.utils.rnn import pad_sequencebatch_var_x = pad_sequence(batch_var_x)batch_var_y = pad_sequence(batch_var_y)with torch.no_grad():weights = np.tanh(np.arange(len(train_y)) * (np.e / len(train_y)))weights = torch.tensor(weights, dtype=torch.float32, device=device)print("Training Start")for e in range(384):# for e in range(480):out = net(batch_var_x)# loss = criterion(out, batch_var_y)loss = (out - batch_var_y) ** 2 * weightsloss = loss.mean()optimizer.zero_grad()loss.backward()optimizer.step()if e % 60 == 0:print('Epoch: {:4}, Loss: {:.5f}'.format(e, loss.item()))torch.save(net.state_dict(), '{}/net.pth'.format(mod_dir))print("Save in:", '{}/net.pth'.format(mod_dir))'''eval'''net.load_state_dict(torch.load('{}/net.pth'.format(mod_dir), map_location=lambda storage, loc: storage))net = net.eval()test_x = data_x.copy()test_x[train_size:, 0] = 0test_x = test_x[:, np.newaxis, :]test_x = torch.tensor(test_x, dtype=torch.float32, device=device)'''simple way but no elegant'''# for i in range(train_size, len(data) - 2):# test_y = net(test_x[:i])# test_x[i, 0, 0] = test_y[-1]'''elegant way but slightly complicated'''eval_size = 1zero_ten = torch.zeros((mid_layers, eval_size, mid_dim), dtype=torch.float32, device=device)test_y, hc = net.output_y_hc(test_x[:train_size], (zero_ten, zero_ten))test_x[train_size + 1, 0, 0] = test_y[-1]for i in range(train_size + 1, len(data) - 2):test_y, hc = net.output_y_hc(test_x[i:i + 1], hc)test_x[i + 1, 0, 0] = test_y[-1]pred_y = test_x[1:, 0, 0]pred_y = pred_y.cpu().data.numpy()diff_y = pred_y[train_size:] - data_y[train_size:-1]l1_loss = np.mean(np.abs(diff_y))l2_loss = np.mean(diff_y ** 2)print("L1: {:.3f} L2: {:.3f}".format(l1_loss, l2_loss))plt.plot(pred_y, 'r', label='pred')plt.plot(data_y, 'b', label='real', alpha=0.3)plt.plot([train_size, train_size], [-1, 2], color='k', label='train | pred')plt.legend(loc='best')tc = datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d')plt.savefig(f'pics/lstm_reg_{tc}.png')plt.pause(4)class RegLSTM(nn.Module):def __init__(self, inp_dim, out_dim, mid_dim, mid_layers):super(RegLSTM, self).__init__()self.rnn = nn.LSTM(inp_dim, mid_dim, mid_layers) # rnnself.reg = nn.Sequential(nn.Linear(mid_dim, mid_dim),nn.Tanh(),nn.Linear(mid_dim, out_dim),) # regressiondef forward(self, x):y = self.rnn(x)[0] # y, (h, c) = self.rnn(x)seq_len, batch_size, hid_dim = y.shapey = y.view(-1, hid_dim)y = self.reg(y)y = y.view(seq_len, batch_size, -1)return ydef output_y_hc(self, x, hc):y, hc = self.rnn(x, hc) # y, (h, c) = self.rnn(x)seq_len, batch_size, hid_dim = y.size()y = y.view(-1, hid_dim)y = self.reg(y)y = y.view(seq_len, batch_size, -1)return y, hcclass RegGRU(nn.Module):def __init__(self, inp_dim, out_dim, mod_dim, mid_layers):super(RegGRU, self).__init__()self.rnn = nn.GRU(inp_dim, mod_dim, mid_layers)self.reg = nn.Linear(mod_dim, out_dim)def forward(self, x):x, h = self.rnn(x) # (seq, batch, hidden)seq_len, batch_size, hid_dim = x.shapex = x.view(-1, hid_dim)x = self.reg(x)x = x.view(seq_len, batch_size, -1)return xdef output_y_h(self, x, h):y, h = self.rnn(x, h)seq_len, batch_size, hid_dim = y.size()y = y.view(-1, hid_dim)y = self.reg(y)y = y.view(seq_len, batch_size, -1)return y, hdef sink_mean_std(file, mean_std):with open(file, 'w') as af:for stock in mean_std:af.write(str(stock) + '\n')def load_data():ts.set_token('xxxxx') # set your token heresq = ts.pro_bar(ts_code='600519.SH', start_date='20210101', end_date='20220211')[['open', 'close', 'low', 'high']]np_sq = sq.values[::-1]# normalizationseq = (np_sq - np_sq.mean(axis=0)) / np_sq.std(axis=0)return seq, np_sq.mean(axis=0), np_sq.std(axis=0)if __name__ == '__main__':run_train_lstm()
结果如下:
因为股票的涨幅本身就有±10%的限制,所以看起来好像预测的挺准的,然后实际股价的变化并不能根据历史数据简单推算,仅用作简单学习使用。