# 用LSTM预测股价

#### 环境准备

``````import numpy as np
import matplotlib.pyplot as plt
import math
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
%matplotlib inline``````

#### 数据描述

Tips: 股票小知识

• Date：日期
• Open：开盘价（股票在某一天的起始价）
• High：最高价
• Low：最低价
• Close：收盘价（股票在某一天的最终价）
• Volume：总交易额

``````# 用pandas载入数据集
dataframe = read_csv('data/stock_data.csv', usecols=[4], engine='python', skipfooter=3)
data = dataframe.values

# 将整型变为float
data = data.astype('float32')
plt.plot(data)
plt.show()``````

#### 构建训练集与测试集

X 0 = (p 0, p 1,..., p i-1)
X 1 = (p i, p i+1,..., p 2i-1)
...
X t = (p ti, p ti+1,..., p (t+1)i-1)

X t+1 = (p (t+1)i, p (t+1)i+1,..., p (t+2)i-1)

Input 1 = [p 0, p 1, p 2, p 3, p 4, p 5], Label 1 = [p 6]
Input 2 = [p 1, p 2, p 3, p 4, p 5, p 6], Label 1 = [p 7]
Input 3 = [p 2, p 3, p 4, p 5, p 6, p 7], Label 1 = [p 8]

``````# 根据原始数据集构建矩阵
def create_dataset(data, time_steps):
dataX, dataY = [], []
for i in range(len(data) - time_steps):
a = data[i:(i + time_steps), 0]
dataX.append(a)
dataY.append(data[i + time_steps, 0])
return np.array(dataX), np.array(dataY)``````

``````# 归一化
scaler = MinMaxScaler(feature_range=(0, 1))
data = scaler.fit_transform(data)

# 切割为训练集和测试集
train_size = int(len(data) * 0.9555)
test_size = len(data) - train_size
train, test = data[0:train_size,:], data[train_size:len(data),:]
time_steps = 6
trainX, trainY = create_dataset(train, time_steps)
testX, testY = create_dataset(test, time_steps)

# reshape输入模型数据的格式为：[samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))``````

#### 建立并训练LSTM模型

1层LSTM，隐藏层的神经元个数为128，输出层为1个预测值，迭代次数为100。

Tips: LSTM参数计算

(hidden size × (hidden size + x_dim) + hidden size) × 4
x_dim为输入数据的特征维度，这里是1。

``````model = Sequential()
model.summary()
history = model.fit(trainX, trainY, epochs=100, batch_size=64, verbose=1)
score = model.evaluate(testX, testY, batch_size=64, verbose=1)``````

``````def visualize_loss(history, title):
loss = history.history["loss"]
epochs = range(len(loss))
plt.figure()
plt.plot(epochs, loss, "b", label="Training loss")
plt.title(title)
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

visualize_loss(history, "Training Loss")``````

#### 预测结果

``````# 预测训练集与测试集
trainPredict = model.predict(trainX)
testPredict = model.predict(testX)

# 对预测结果进行反归一化处理
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform([trainY])
testPredict = scaler.inverse_transform(testPredict)
testY = scaler.inverse_transform([testY])

# 计算训练集与测试集的RMSE
trainScore = math.sqrt(mean_squared_error(trainY[0], trainPredict[:,0]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(testY[0], testPredict[:,0]))
print('Test Score: %.2f RMSE' % (testScore))

# 绘制预测结果图
trainPredictPlot = np.empty_like(data)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[time_steps:len(trainPredict) + time_steps, :] = trainPredict

testPredictPlot = np.empty_like(data)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(trainPredict) + (time_steps * 2)-1:len(data) - 1, :] = testPredict

plt.plot(scaler.inverse_transform(data))
plt.plot(trainPredictPlot)
plt.plot(testPredictPlot)
plt.show()``````