link
【翻译自 : How to Develop an Encoder-Decoder Model with Attention in Keras 】
【说明:Jason Brownlee PhD大神的文章个人很喜欢,所以闲暇时间里会做一点翻译和学习实践的工作,这里是相应工作的实践记录,希望能帮到有需要的人!】
事实证明,循环神经网络的编码器-解码器架构在自然语言处理领域(例如机器翻译和字幕生成)中的许多序列到序列预测问题上非常强大。
注意是一种机制,它解决了编码器-解码器架构对长序列的限制,并且通常可以加速学习并提升模型的技能,无序列到序列预测问题。
在本教程中,您将了解如何使用 Keras 在 Python 中开发具有注意力的编码器-解码器循环神经网络。完成本教程后,您将了解:
- 如何设计一个小且可配置的问题来评估编码器 - 解码器循环神经网络有/无注意力。
- 如何设计和评估编码器-解码器网络,注意和不注意序列预测问题。
- 如何在有注意力和无注意力的情况下稳健地比较编码器-解码器网络的性能。
教程概述
本教程分为6个部分; 他们是:
带注意力的编码器-解码器 注意力测试 无需注意的编码器-解码器 自定义 Keras 注意力层 带注意力的编码器-解码器 模型比较
Python环境
本教程假设您已安装 Python 3 SciPy 环境。您必须使用 TensorFlow 或 Theano 后端安装 Keras(2.0 或更高版本)。本教程还假设您已安装 scikit-learn、Pandas、NumPy 和 Matplotlib。
带注意力的编码器-解码器
循环神经网络的编码器-解码器模型是一种用于序列到序列预测问题的架构。顾名思义,它由两个子模型组成:
编码器: 编码器负责逐步遍历输入的时间步长,并将整个序列编码成一个固定长度的向量,称为上下文向量。 解码器: 解码器负责在从上下文向量中读取时逐步执行输出时间步骤。
该架构的一个问题是在长输入或输出序列上的性能很差。原因被认为是因为编码器使用了固定大小的内部表示。注意力是解决此限制的体系结构的扩展。它的工作原理是首先提供从编码器到解码器的更丰富的上下文,以及一种学习机制,其中解码器可以在预测输出序列中的每个时间步长时,在更丰富的编码中学习要注意的位置。
有关编码器-解码器架构的更多关注,请参阅以下帖子:
长短期记忆循环神经网络中的注意力
注意力在编码器-解码器循环神经网络中是如何工作的
注意力测试
在我们开发带有注意力的模型之前,我们将首先定义一个人为的可扩展测试问题,我们可以用它来确定注意力是否提供任何好处。在这个问题中,我们将生成随机整数序列作为输入,并匹配由输入序列中整数子集组成的输出序列。
例如,输入序列可能是 [1, 6, 2, 7, 3],预期输出序列可能是序列 [1, 6] 中的前两个随机整数。我们将定义问题,使得输入和输出序列的长度相同,并根据需要用“0”值填充输出序列。
首先,我们需要一个函数来生成随机整数序列。我们将使用 Python randint() 函数生成 0 到最大值之间的随机整数,并将此范围用作问题的基数。下面的函数 generate_sequence() 将生成一个具有固定长度和指定基数的随机整数序列。
- from random import randint
-
- # generate a sequence of random integers
- def generate_sequence(length, n_unique):
- return [randint(0, n_unique-1) for _ in range(length)]
-
- # generate random sequence
- sequence = generate_sequence(5, 50)
- print(sequence)
运行此示例会生成 5 个时间步长的序列,其中序列中的每个值都是 0 到 49 之间的随机整数。
[43, 3, 28, 34, 33]
接下来,我们需要一个函数来将离散整数值编码为二进制向量。如果使用基数 50,则每个整数将由指定整数值索引中的 0 值和 1 的 50 元素向量表示。下面的 one_hot_encode() 函数将对给定的整数序列进行one-hot编码。
- # one hot encode sequence
- def one_hot_encode(sequence, n_unique):
- encoding = list()
- for value in sequence:
- vector = [0 for _ in range(n_unique)]
- vector[value] = 1
- encoding.append(vector)
- return array(encoding)
我们还需要能够解码编码序列。 这将需要将来自模型的预测或编码的预期序列转换回我们可以读取和评估的整数序列。下面的 one_hot_decode() 函数会将编码序列解码回整数序列。
- # decode a one hot encoded string
- def one_hot_decode(encoded_seq):
- return [argmax(vector) for vector in encoded_seq]
我们可以在下面的示例中测试这些操作。
- from random import randint
- from numpy import array
- from numpy import argmax
-
- # generate a sequence of random integers
- def generate_sequence(length, n_unique):
- return [randint(0, n_unique-1) for _ in range(length)]
-
- # one hot encode sequence
- def one_hot_encode(sequence, n_unique):
- encoding = list()
- for value in sequence:
- vector = [0 for _ in range(n_unique)]
- vector[value] = 1
- encoding.append(vector)
- return array(encoding)
-
- # decode a one hot encoded string
- def one_hot_decode(encoded_seq):
- return [argmax(vector) for vector in encoded_seq]
-
- # generate random sequence
- sequence = generate_sequence(5, 50)
- print(sequence)
- # one hot encode
- encoded = one_hot_encode(sequence, 50)
- print(encoded)
- # decode
- decoded = one_hot_decode(encoded)
- print(decoded)
运行该示例首先打印一个随机生成的序列,然后打印一个编码版本,最后再次打印解码序列。
- [3, 18, 32, 11, 36]
- [[0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
- [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
- [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
- [0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
- [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0]]
- [3, 18, 32, 11, 36]
最后,我们需要一个函数来创建输入和输出序列对来训练和评估模型。
下面名为 get_pair() 的函数将返回一个输入和输出序列对,给定指定的输入长度、输出长度和基数。 输入和输出序列的长度相同,即输入序列的长度,但输出序列将作为输入序列的前 n 个字符,并用零值填充到所需长度。然后对整数序列进行编码,然后重新整形为循环神经网络所需的 3D 格式,其维度为:样本、时间步长和特征。 在这种情况下,样本始终为 1,因为我们只生成一对输入-输出,时间步长是输入序列长度,特征是每个时间步长的基数。
- # prepare data for the LSTM
- def get_pair(n_in, n_out, n_unique):
- # generate random sequence
- sequence_in = generate_sequence(n_in, n_unique)
- sequence_out = sequence_in[:n_out] + [0 for _ in range(n_in-n_out)]
- # one hot encode
- X = one_hot_encode(sequence_in, n_unique)
- y = one_hot_encode(sequence_out, n_unique)
- # reshape as 3D
- X = X.reshape((1, X.shape[0], X.shape[1]))
- y = y.reshape((1, y.shape[0], y.shape[1]))
- return X,y
完整代码如下:
- from random import randint
- from numpy import array
- from numpy import argmax
-
- # generate a sequence of random integers
- def generate_sequence(length, n_unique):
- return [randint(0, n_unique-1) for _ in range(length)]
-
- # one hot encode sequence
- def one_hot_encode(sequence, n_unique):
- encoding = list()
- for value in sequence:
- vector = [0 for _ in range(n_unique)]
- vector[value] = 1
- encoding.append(vector)
- return array(encoding)
-
- # decode a one hot encoded string
- def one_hot_decode(encoded_seq):
- return [argmax(vector) for vector in encoded_seq]
-
- # prepare data for the LSTM
- def get_pair(n_in, n_out, n_unique):
- # generate random sequence
- sequence_in = generate_sequence(n_in, n_unique)
- sequence_out = sequence_in[:n_out] + [0 for _ in range(n_in-n_out)]
- # one hot encode
- X = one_hot_encode(sequence_in, n_unique)
- y = one_hot_encode(sequence_out, n_unique)
- # reshape as 3D
- X = X.reshape((1, X.shape[0], X.shape[1]))
- y = y.reshape((1, y.shape[0], y.shape[1]))
- return X,y
-
- # generate random sequence
- X, y = get_pair(5, 2, 50)
- print(X.shape, y.shape)
- print('X=%s, y=%s' % (one_hot_decode(X[0]), one_hot_decode(y[0])))
运行该示例会生成一个输入-输出对并打印两个数组的形状。生成的对然后以解码的形式打印,我们可以看到序列的前两个整数在输出序列中被复制,然后是零值的填充。
- (1, 5, 50) (1, 5, 50)
- X=[12, 20, 36, 40, 12], y=[12, 20, 0, 0, 0]
无需注意力的编码器-解码器
在本节中,我们将在没有注意力的情况下使用编码器 - 解码器模型开发该问题的性能基线。我们将在 5 个时间步长的输入和输出序列、输出序列中输入序列的前 2 个元素和基数 50 处修复问题定义。
- # configure problem
- n_features = 50
- n_timesteps_in = 5
- n_timesteps_out = 2
我们可以在 Keras 中开发一个简单的编码器 - 解码器模型,方法是从编码器 LSTM 模型中获取输出,针对输出序列中的时间步数重复 n 次,然后使用解码器来预测输出序列。我们将使用相同数量的单元配置编码器和解码器,在本例中为 150。我们将使用梯度下降的高效 Adam 实现并优化分类交叉熵损失函数,因为该问题在技术上是一个多类分类问题 .该模型的配置是经过一些试验和错误后发现的,并没有经过优化。
下面列出了 Keras 中编码器-解码器架构的代码。
- # define model
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features)))
- model.add(RepeatVector(n_timesteps_in))
- model.add(LSTM(150, return_sequences=True))
- model.add(TimeDistributed(Dense(n_features, activation='softmax')))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
我们将在 5,000 个随机输入-输出整数序列对上训练模型。
- # train LSTM
- for epoch in range(5000):
- # generate new random sequence
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- # fit model for one epoch on this sequence
- model.fit(X, y, epochs=1, verbose=2)
训练完成后,我们将在 100 个新的随机生成的整数序列上评估模型,并且仅在整个输出序列与预期值匹配时才将预测标记为正确。
- # evaluate LSTM
- total, correct = 100, 0
- for _ in range(total):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- if array_equal(one_hot_decode(y[0]), one_hot_decode(yhat[0])):
- correct += 1
- print('Accuracy: %.2f%%' % (float(correct)/float(total)*100.0))
最后,我们将打印 10 个预期输出序列和模型预测序列的示例,下面列出了完整的示例。
- from random import randint
- from numpy import array
- from numpy import argmax
- from numpy import array_equal
- from keras.models import Sequential
- from keras.layers import LSTM
- from keras.layers import Dense
- from keras.layers import TimeDistributed
- from keras.layers import RepeatVector
-
- # generate a sequence of random integers
- def generate_sequence(length, n_unique):
- return [randint(0, n_unique-1) for _ in range(length)]
-
- # one hot encode sequence
- def one_hot_encode(sequence, n_unique):
- encoding = list()
- for value in sequence:
- vector = [0 for _ in range(n_unique)]
- vector[value] = 1
- encoding.append(vector)
- return array(encoding)
-
- # decode a one hot encoded string
- def one_hot_decode(encoded_seq):
- return [argmax(vector) for vector in encoded_seq]
-
- # prepare data for the LSTM
- def get_pair(n_in, n_out, cardinality):
- # generate random sequence
- sequence_in = generate_sequence(n_in, cardinality)
- sequence_out = sequence_in[:n_out] + [0 for _ in range(n_in-n_out)]
- # one hot encode
- X = one_hot_encode(sequence_in, cardinality)
- y = one_hot_encode(sequence_out, cardinality)
- # reshape as 3D
- X = X.reshape((1, X.shape[0], X.shape[1]))
- y = y.reshape((1, y.shape[0], y.shape[1]))
- return X,y
-
- # configure problem
- n_features = 50
- n_timesteps_in = 5
- n_timesteps_out = 2
- # define model
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features)))
- model.add(RepeatVector(n_timesteps_in))
- model.add(LSTM(150, return_sequences=True))
- model.add(TimeDistributed(Dense(n_features, activation='softmax')))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- # train LSTM
- for epoch in range(5000):
- # generate new random sequence
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- # fit model for one epoch on this sequence
- model.fit(X, y, epochs=1, verbose=2)
- # evaluate LSTM
- total, correct = 100, 0
- for _ in range(total):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- if array_equal(one_hot_decode(y[0]), one_hot_decode(yhat[0])):
- correct += 1
- print('Accuracy: %.2f%%' % (float(correct)/float(total)*100.0))
- # spot check some examples
- for _ in range(10):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- print('Expected:', one_hot_decode(y[0]), 'Predicted', one_hot_decode(yhat[0]))
运行这个例子不会花很长时间,也许在 CPU 上几分钟,不需要 GPU。
注意:由于算法或评估程序的随机性或数值精度的差异,您的结果可能会有所不同。 考虑多次运行该示例并比较平均结果。
我 们可以从样本输出中看到,对于大多数或所有情况,模型确实在输出序列中得到了一个正确的数字,并且只在第二个数字上挣扎。 所有零填充值都被正确预测。
- Expected: [47, 0, 0, 0, 0] Predicted [47, 47, 0, 0, 0]
- Expected: [43, 31, 0, 0, 0] Predicted [43, 31, 0, 0, 0]
- Expected: [14, 22, 0, 0, 0] Predicted [14, 14, 0, 0, 0]
- Expected: [39, 31, 0, 0, 0] Predicted [39, 39, 0, 0, 0]
- Expected: [6, 4, 0, 0, 0] Predicted [6, 4, 0, 0, 0]
- Expected: [47, 0, 0, 0, 0] Predicted [47, 47, 0, 0, 0]
- Expected: [39, 33, 0, 0, 0] Predicted [39, 39, 0, 0, 0]
- Expected: [23, 2, 0, 0, 0] Predicted [23, 23, 0, 0, 0]
- Expected: [19, 28, 0, 0, 0] Predicted [19, 3, 0, 0, 0]
- Expected: [32, 33, 0, 0, 0] Predicted [32, 32, 0, 0, 0]
自定义 Keras 注意力层
现在我们需要关注编码器-解码器模型。在撰写本文时,Keras 还没有内置于库中的注意力功能,但即将推出。在 Keras 中正式提供注意力之前,我们可以开发自己的实现或使用现有的第三方实现。为了加快速度,让我们使用现有的第三方实现。
Datalogue 的实习生 Zafarali Ahmed 在 2017 年的一篇题为“如何在 Keras 中使用注意力可视化您的循环神经网络”和名为“keras-attention”的 GitHub 项目中为 Keras 开发了一个自定义层,为注意力提供支持。
自定义注意力层称为 AttentionDecoder,可在 GitHub 项目的 custom_recurrents.py 文件中找到。我们可以在项目的 GNU Affero General Public License v3.0 许可下重用此代码。为完整起见,下面列出了自定义图层的副本。将其复制并粘贴到当前工作目录中名为“attention_decoder.py”的一个新的单独文件中。
- import tensorflow as tf
- from keras import backend as K
- from keras import regularizers, constraints, initializers, activations
- from keras.layers.recurrent import Recurrent, _time_distributed_dense
- from keras.engine import InputSpec
-
- tfPrint = lambda d, T: tf.Print(input_=T, data=[T, tf.shape(T)], message=d)
-
- class AttentionDecoder(Recurrent):
-
- def __init__(self, units, output_dim,
- activation='tanh',
- return_probabilities=False,
- name='AttentionDecoder',
- kernel_initializer='glorot_uniform',
- recurrent_initializer='orthogonal',
- bias_initializer='zeros',
- kernel_regularizer=None,
- bias_regularizer=None,
- activity_regularizer=None,
- kernel_constraint=None,
- bias_constraint=None,
- **kwargs):
- """
- Implements an AttentionDecoder that takes in a sequence encoded by an
- encoder and outputs the decoded states
- :param units: dimension of the hidden state and the attention matrices
- :param output_dim: the number of labels in the output space
- references:
- Bahdanau, Dzmitry, Kyunghyun Cho, and Yoshua Bengio.
- "Neural machine translation by jointly learning to align and translate."
- arXiv preprint arXiv:1409.0473 (2014).
- """
- self.units = units
- self.output_dim = output_dim
- self.return_probabilities = return_probabilities
- self.activation = activations.get(activation)
- self.kernel_initializer = initializers.get(kernel_initializer)
- self.recurrent_initializer = initializers.get(recurrent_initializer)
- self.bias_initializer = initializers.get(bias_initializer)
-
- self.kernel_regularizer = regularizers.get(kernel_regularizer)
- self.recurrent_regularizer = regularizers.get(kernel_regularizer)
- self.bias_regularizer = regularizers.get(bias_regularizer)
- self.activity_regularizer = regularizers.get(activity_regularizer)
-
- self.kernel_constraint = constraints.get(kernel_constraint)
- self.recurrent_constraint = constraints.get(kernel_constraint)
- self.bias_constraint = constraints.get(bias_constraint)
-
- super(AttentionDecoder, self).__init__(**kwargs)
- self.name = name
- self.return_sequences = True # must return sequences
-
- def build(self, input_shape):
- """
- See Appendix 2 of Bahdanau 2014, arXiv:1409.0473
- for model details that correspond to the matrices here.
- """
-
- self.batch_size, self.timesteps, self.input_dim = input_shape
-
- if self.stateful:
- super(AttentionDecoder, self).reset_states()
-
- self.states = [None, None] # y, s
-
- """
- Matrices for creating the context vector
- """
-
- self.V_a = self.add_weight(shape=(self.units,),
- name='V_a',
- initializer=self.kernel_initializer,
- regularizer=self.kernel_regularizer,
- constraint=self.kernel_constraint)
- self.W_a = self.add_weight(shape=(self.units, self.units),
- name='W_a',
- initializer=self.kernel_initializer,
- regularizer=self.kernel_regularizer,
- constraint=self.kernel_constraint)
- self.U_a = self.add_weight(shape=(self.input_dim, self.units),
- name='U_a',
- initializer=self.kernel_initializer,
- regularizer=self.kernel_regularizer,
- constraint=self.kernel_constraint)
- self.b_a = self.add_weight(shape=(self.units,),
- name='b_a',
- initializer=self.bias_initializer,
- regularizer=self.bias_regularizer,
- constraint=self.bias_constraint)
- """
- Matrices for the r (reset) gate
- """
- self.C_r = self.add_weight(shape=(self.input_dim, self.units),
- name='C_r',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.U_r = self.add_weight(shape=(self.units, self.units),
- name='U_r',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.W_r = self.add_weight(shape=(self.output_dim, self.units),
- name='W_r',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.b_r = self.add_weight(shape=(self.units, ),
- name='b_r',
- initializer=self.bias_initializer,
- regularizer=self.bias_regularizer,
- constraint=self.bias_constraint)
-
- """
- Matrices for the z (update) gate
- """
- self.C_z = self.add_weight(shape=(self.input_dim, self.units),
- name='C_z',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.U_z = self.add_weight(shape=(self.units, self.units),
- name='U_z',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.W_z = self.add_weight(shape=(self.output_dim, self.units),
- name='W_z',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.b_z = self.add_weight(shape=(self.units, ),
- name='b_z',
- initializer=self.bias_initializer,
- regularizer=self.bias_regularizer,
- constraint=self.bias_constraint)
- """
- Matrices for the proposal
- """
- self.C_p = self.add_weight(shape=(self.input_dim, self.units),
- name='C_p',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.U_p = self.add_weight(shape=(self.units, self.units),
- name='U_p',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.W_p = self.add_weight(shape=(self.output_dim, self.units),
- name='W_p',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.b_p = self.add_weight(shape=(self.units, ),
- name='b_p',
- initializer=self.bias_initializer,
- regularizer=self.bias_regularizer,
- constraint=self.bias_constraint)
- """
- Matrices for making the final prediction vector
- """
- self.C_o = self.add_weight(shape=(self.input_dim, self.output_dim),
- name='C_o',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.U_o = self.add_weight(shape=(self.units, self.output_dim),
- name='U_o',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.W_o = self.add_weight(shape=(self.output_dim, self.output_dim),
- name='W_o',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
- self.b_o = self.add_weight(shape=(self.output_dim, ),
- name='b_o',
- initializer=self.bias_initializer,
- regularizer=self.bias_regularizer,
- constraint=self.bias_constraint)
-
- # For creating the initial state:
- self.W_s = self.add_weight(shape=(self.input_dim, self.units),
- name='W_s',
- initializer=self.recurrent_initializer,
- regularizer=self.recurrent_regularizer,
- constraint=self.recurrent_constraint)
-
- self.input_spec = [
- InputSpec(shape=(self.batch_size, self.timesteps, self.input_dim))]
- self.built = True
-
- def call(self, x):
- # store the whole sequence so we can "attend" to it at each timestep
- self.x_seq = x
-
- # apply the a dense layer over the time dimension of the sequence
- # do it here because it doesn't depend on any previous steps
- # thefore we can save computation time:
- self._uxpb = _time_distributed_dense(self.x_seq, self.U_a, b=self.b_a,
- input_dim=self.input_dim,
- timesteps=self.timesteps,
- output_dim=self.units)
-
- return super(AttentionDecoder, self).call(x)
-
- def get_initial_state(self, inputs):
- # apply the matrix on the first time step to get the initial s0.
- s0 = activations.tanh(K.dot(inputs[:, 0], self.W_s))
-
- # from keras.layers.recurrent to initialize a vector of (batchsize,
- # output_dim)
- y0 = K.zeros_like(inputs) # (samples, timesteps, input_dims)
- y0 = K.sum(y0, axis=(1, 2)) # (samples, )
- y0 = K.expand_dims(y0) # (samples, 1)
- y0 = K.tile(y0, [1, self.output_dim])
-
- return [y0, s0]
-
- def step(self, x, states):
-
- ytm, stm = states
-
- # repeat the hidden state to the length of the sequence
- _stm = K.repeat(stm, self.timesteps)
-
- # now multiplty the weight matrix with the repeated hidden state
- _Wxstm = K.dot(_stm, self.W_a)
-
- # calculate the attention probabilities
- # this relates how much other timesteps contributed to this one.
- et = K.dot(activations.tanh(_Wxstm + self._uxpb),
- K.expand_dims(self.V_a))
- at = K.exp(et)
- at_sum = K.sum(at, axis=1)
- at_sum_repeated = K.repeat(at_sum, self.timesteps)
- at /= at_sum_repeated # vector of size (batchsize, timesteps, 1)
-
- # calculate the context vector
- context = K.squeeze(K.batch_dot(at, self.x_seq, axes=1), axis=1)
- # ~~~> calculate new hidden state
- # first calculate the "r" gate:
-
- rt = activations.sigmoid(
- K.dot(ytm, self.W_r)
- + K.dot(stm, self.U_r)
- + K.dot(context, self.C_r)
- + self.b_r)
-
- # now calculate the "z" gate
- zt = activations.sigmoid(
- K.dot(ytm, self.W_z)
- + K.dot(stm, self.U_z)
- + K.dot(context, self.C_z)
- + self.b_z)
-
- # calculate the proposal hidden state:
- s_tp = activations.tanh(
- K.dot(ytm, self.W_p)
- + K.dot((rt * stm), self.U_p)
- + K.dot(context, self.C_p)
- + self.b_p)
-
- # new hidden state:
- st = (1-zt)*stm + zt * s_tp
-
- yt = activations.softmax(
- K.dot(ytm, self.W_o)
- + K.dot(stm, self.U_o)
- + K.dot(context, self.C_o)
- + self.b_o)
-
- if self.return_probabilities:
- return at, [yt, st]
- else:
- return yt, [yt, st]
-
- def compute_output_shape(self, input_shape):
- """
- For Keras internal compatability checking
- """
- if self.return_probabilities:
- return (None, self.timesteps, self.timesteps)
- else:
- return (None, self.timesteps, self.output_dim)
-
- def get_config(self):
- """
- For rebuilding models on load time.
- """
- config = {
- 'output_dim': self.output_dim,
- 'units': self.units,
- 'return_probabilities': self.return_probabilities
- }
- base_config = super(AttentionDecoder, self).get_config()
- return dict(list(base_config.items()) + list(config.items()))
我们可以通过如下导入来在我们的项目中使用这个自定义层:
from attention_decoder import AttentionDecoder
该层实现了 Bahdanau 等人所描述的注意力。 在他们的论文“通过联合学习对齐和翻译的神经机器翻译”中。该代码在原始帖子中得到了很好的解释,并链接到 LSTM 和注意力方程。这种实现的一个限制是它必须输出与输入序列长度相同的序列,这是编码器-解码器架构旨在克服的特定限制。
重要的是,层由第二个 LSTM 执行的解码的重复,以及由编码器 - 解码器模型中的密集输出层执行的模型的 softmax 输出,而无需注意。 这大大简化了模型的代码。需要注意的是,自定义层建立在 Keras 中的 Recurrent 层之上,在撰写本文时,该层被标记为遗留代码,并且可能会在某个时候从项目中删除。
编码器-解码器注意力
现在我们有了一个可以使用的注意力实现,我们可以开发一个编码器-解码器模型,并针对我们人为的序列预测问题进行关注。具有注意力层的模型定义如下。 我们可以看到该层处理编码器-解码器模型本身的一些机制,从而使模型定义更简单。
- # define model
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features), return_sequences=True))
- model.add(AttentionDecoder(150, n_features))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
完整代码如下:
- from random import randint
- from numpy import array
- from numpy import argmax
- from numpy import array_equal
- from keras.models import Sequential
- from keras.layers import LSTM
- from attention_decoder import AttentionDecoder
-
- # generate a sequence of random integers
- def generate_sequence(length, n_unique):
- return [randint(0, n_unique-1) for _ in range(length)]
-
- # one hot encode sequence
- def one_hot_encode(sequence, n_unique):
- encoding = list()
- for value in sequence:
- vector = [0 for _ in range(n_unique)]
- vector[value] = 1
- encoding.append(vector)
- return array(encoding)
-
- # decode a one hot encoded string
- def one_hot_decode(encoded_seq):
- return [argmax(vector) for vector in encoded_seq]
-
- # prepare data for the LSTM
- def get_pair(n_in, n_out, cardinality):
- # generate random sequence
- sequence_in = generate_sequence(n_in, cardinality)
- sequence_out = sequence_in[:n_out] + [0 for _ in range(n_in-n_out)]
- # one hot encode
- X = one_hot_encode(sequence_in, cardinality)
- y = one_hot_encode(sequence_out, cardinality)
- # reshape as 3D
- X = X.reshape((1, X.shape[0], X.shape[1]))
- y = y.reshape((1, y.shape[0], y.shape[1]))
- return X,y
-
- # configure problem
- n_features = 50
- n_timesteps_in = 5
- n_timesteps_out = 2
-
- # define model
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features), return_sequences=True))
- model.add(AttentionDecoder(150, n_features))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- # train LSTM
- for epoch in range(5000):
- # generate new random sequence
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- # fit model for one epoch on this sequence
- model.fit(X, y, epochs=1, verbose=2)
- # evaluate LSTM
- total, correct = 100, 0
- for _ in range(total):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- if array_equal(one_hot_decode(y[0]), one_hot_decode(yhat[0])):
- correct += 1
- print('Accuracy: %.2f%%' % (float(correct)/float(total)*100.0))
- # spot check some examples
- for _ in range(10):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- print('Expected:', one_hot_decode(y[0]), 'Predicted', one_hot_decode(yhat[0]))
运行该示例会在 100 个随机生成的输入-输出对上打印模型的输出。
注意:由于算法或评估程序的随机性或数值精度的差异,您的结果可能会有所不同。 考虑多次运行该示例并比较平均结果。
在相同的资源和相同的训练量下,有注意力的模型表现要好得多。
Accuracy: 95.00%
抽查一些样本输出和预测序列,我们可以看到很少的错误,即使在前两个元素中存在零值的情况下也是如此。
- Expected: [48, 47, 0, 0, 0] Predicted [48, 47, 0, 0, 0]
- Expected: [7, 46, 0, 0, 0] Predicted [7, 46, 0, 0, 0]
- Expected: [32, 30, 0, 0, 0] Predicted [32, 2, 0, 0, 0]
- Expected: [3, 25, 0, 0, 0] Predicted [3, 25, 0, 0, 0]
- Expected: [45, 4, 0, 0, 0] Predicted [45, 4, 0, 0, 0]
- Expected: [49, 9, 0, 0, 0] Predicted [49, 9, 0, 0, 0]
- Expected: [22, 23, 0, 0, 0] Predicted [22, 23, 0, 0, 0]
- Expected: [29, 36, 0, 0, 0] Predicted [29, 36, 0, 0, 0]
- Expected: [0, 29, 0, 0, 0] Predicted [0, 29, 0, 0, 0]
- Expected: [11, 26, 0, 0, 0] Predicted [11, 26, 0, 0, 0]
模型比较
尽管我们通过注意力从模型中获得了更好的结果,但结果是从每个模型的单次运行中报告的。在这种情况下,我们通过多次重复评估每个模型并报告这些运行的平均性能来寻求更可靠的发现。 有关这种评估神经网络模型的稳健方法的更多信息,请参阅帖子:
如何评估深度学习模型的技能
我们可以定义一个函数来创建每种类型的模型,如下所示。
- # define the encoder-decoder model
- def baseline_model(n_timesteps_in, n_features):
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features)))
- model.add(RepeatVector(n_timesteps_in))
- model.add(LSTM(150, return_sequences=True))
- model.add(TimeDistributed(Dense(n_features, activation='softmax')))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- return model
-
- # define the encoder-decoder with attention model
- def attention_model(n_timesteps_in, n_features):
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features), return_sequences=True))
- model.add(AttentionDecoder(150, n_features))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- return model
然后我们可以定义一个函数来拟合和评估拟合模型的准确性并返回准确性分数。
- # train and evaluate a model, return accuracy
- def train_evaluate_model(model, n_timesteps_in, n_timesteps_out, n_features):
- # train LSTM
- for epoch in range(5000):
- # generate new random sequence
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- # fit model for one epoch on this sequence
- model.fit(X, y, epochs=1, verbose=0)
- # evaluate LSTM
- total, correct = 100, 0
- for _ in range(total):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- if array_equal(one_hot_decode(y[0]), one_hot_decode(yhat[0])):
- correct += 1
- return float(correct)/float(total)*100.0
将这些放在一起,我们可以多次重复创建、训练和评估每种类型模型的过程,并报告重复的平均准确度。 为了减少运行时间,我们将每个模型评估重复 10 次,但如果您有资源,您可以将次数增加到 30 或 100 次。下面列出了完整的示例。
- from random import randint
- from numpy import array
- from numpy import argmax
- from numpy import array_equal
- from keras.models import Sequential
- from keras.layers import LSTM
- from keras.layers import Dense
- from keras.layers import TimeDistributed
- from keras.layers import RepeatVector
- from attention_decoder import AttentionDecoder
-
- # generate a sequence of random integers
- def generate_sequence(length, n_unique):
- return [randint(0, n_unique-1) for _ in range(length)]
-
- # one hot encode sequence
- def one_hot_encode(sequence, n_unique):
- encoding = list()
- for value in sequence:
- vector = [0 for _ in range(n_unique)]
- vector[value] = 1
- encoding.append(vector)
- return array(encoding)
-
- # decode a one hot encoded string
- def one_hot_decode(encoded_seq):
- return [argmax(vector) for vector in encoded_seq]
-
- # prepare data for the LSTM
- def get_pair(n_in, n_out, cardinality):
- # generate random sequence
- sequence_in = generate_sequence(n_in, cardinality)
- sequence_out = sequence_in[:n_out] + [0 for _ in range(n_in-n_out)]
- # one hot encode
- X = one_hot_encode(sequence_in, cardinality)
- y = one_hot_encode(sequence_out, cardinality)
- # reshape as 3D
- X = X.reshape((1, X.shape[0], X.shape[1]))
- y = y.reshape((1, y.shape[0], y.shape[1]))
- return X,y
-
- # define the encoder-decoder model
- def baseline_model(n_timesteps_in, n_features):
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features)))
- model.add(RepeatVector(n_timesteps_in))
- model.add(LSTM(150, return_sequences=True))
- model.add(TimeDistributed(Dense(n_features, activation='softmax')))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- return model
-
- # define the encoder-decoder with attention model
- def attention_model(n_timesteps_in, n_features):
- model = Sequential()
- model.add(LSTM(150, input_shape=(n_timesteps_in, n_features), return_sequences=True))
- model.add(AttentionDecoder(150, n_features))
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- return model
-
- # train and evaluate a model, return accuracy
- def train_evaluate_model(model, n_timesteps_in, n_timesteps_out, n_features):
- # train LSTM
- for epoch in range(5000):
- # generate new random sequence
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- # fit model for one epoch on this sequence
- model.fit(X, y, epochs=1, verbose=0)
- # evaluate LSTM
- total, correct = 100, 0
- for _ in range(total):
- X,y = get_pair(n_timesteps_in, n_timesteps_out, n_features)
- yhat = model.predict(X, verbose=0)
- if array_equal(one_hot_decode(y[0]), one_hot_decode(yhat[0])):
- correct += 1
- return float(correct)/float(total)*100.0
-
- # configure problem
- n_features = 50
- n_timesteps_in = 5
- n_timesteps_out = 2
- n_repeats = 10
- # evaluate encoder-decoder model
- print('Encoder-Decoder Model')
- results = list()
- for _ in range(n_repeats):
- model = baseline_model(n_timesteps_in, n_features)
- accuracy = train_evaluate_model(model, n_timesteps_in, n_timesteps_out, n_features)
- results.append(accuracy)
- print(accuracy)
- print('Mean Accuracy: %.2f%%' % (sum(results)/float(n_repeats)))
- # evaluate encoder-decoder with attention model
- print('Encoder-Decoder With Attention Model')
- results = list()
- for _ in range(n_repeats):
- model = attention_model(n_timesteps_in, n_features)
- accuracy = train_evaluate_model(model, n_timesteps_in, n_timesteps_out, n_features)
- results.append(accuracy)
- print(accuracy)
- print('Mean Accuracy: %.2f%%' % (sum(results)/float(n_repeats)))
注意:由于算法或评估程序的随机性或数值精度的差异,您的结果可能会有所不同。 考虑多次运行该示例并比较平均结果。
运行此示例会打印每个模型重复的准确度,让您了解运行的进度。
- Encoder-Decoder Model
- 20.0
- 23.0
- 23.0
- 18.0
- 28.000000000000004
- 28.999999999999996
- 23.0
- 26.0
- 21.0
- 20.0
- Mean Accuracy: 23.10%
-
- Encoder-Decoder With Attention Model
- 98.0
- 91.0
- 94.0
- 93.0
- 96.0
- 99.0
- 97.0
- 94.0
- 99.0
- 96.0
- Mean Accuracy: 95.70%
我们可以看到,即使平均超过 10 次运行,注意力模型仍然比没有注意力的编码器-解码器模型表现出更好的性能,分别为 23.10% 和 95.70%。这种评估的一个很好的扩展是捕获每个模型每个时期的模型损失,取平均值,并比较有和没有注意的架构的损失如何随时间变化。我希望这个轨迹会显示注意力比非注意力模型更快、更快地获得更好的技能,进一步突出了这种方法的好处。