python-pytorch编写transformer模型实现问答0.5.00--训练和预测
- 背景
- 代码
- 训练
- 预测
- 效果
背景
代码写不了这么长,接上一篇
https://blog.csdn.net/m0_60688978/article/details/139360270
代码
# 定义解码器类
n_layers = 6 # 设置 Decoder 的层数
class Decoder(nn.Module):def __init__(self, corpus):super(Decoder, self).__init__()self.tgt_emb = nn.Embedding(vocab_size, d_embedding) # 词嵌入层self.pos_emb = nn.Embedding.from_pretrained( \get_sin_enc_table(vocab_size+1, d_embedding), freeze=True) # 位置嵌入层 self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) # 叠加多层def forward(self, dec_inputs, enc_inputs, enc_outputs): #------------------------- 维度信息 --------------------------------# dec_inputs 的维度是 [batch_size, target_len]# enc_inputs 的维度是 [batch_size, source_len]# enc_outputs 的维度是 [batch_size, source_len, embedding_dim]#----------------------------------------------------------------- # 创建一个从 1 到 source_len 的位置索引序列pos_indices = torch.arange(1, dec_inputs.size(1) + 1).unsqueeze(0).to(dec_inputs)#------------------------- 维度信息 --------------------------------# pos_indices 的维度是 [1, target_len]#----------------------------------------------------------------- # 对输入进行词嵌入和位置嵌入相加dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(pos_indices)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]#----------------------------------------------------------------- # 生成解码器自注意力掩码和解码器 - 编码器注意力掩码dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) # 填充位掩码dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs) # 后续位掩码dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask \+ dec_self_attn_subsequent_mask), 0) dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # 解码器 - 编码器掩码#------------------------- 维度信息 -------------------------------- # dec_self_attn_pad_mask 的维度是 [batch_size, target_len, target_len]# dec_self_attn_subsequent_mask 的维度是 [batch_size, target_len, target_len]# dec_self_attn_mask 的维度是 [batch_size, target_len, target_len]# dec_enc_attn_mask 的维度是 [batch_size, target_len, source_len]#----------------------------------------------------------------- dec_self_attns, dec_enc_attns = [], [] # 初始化 dec_self_attns, dec_enc_attns# 通过解码器层 [batch_size, seq_len, embedding_dim]for layer in self.layers:dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)dec_self_attns.append(dec_self_attn)dec_enc_attns.append(dec_enc_attn)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_self_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, target_len, target_len]# dec_enc_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, target_len, source_len]#----------------------------------------------------------------- # 返回解码器输出,解码器自注意力和解码器 - 编码器注意力权重 return dec_outputs, dec_self_attns, dec_enc_attns# 定义 Transformer 模型
class Transformer(nn.Module):def __init__(self):super(Transformer, self).__init__() self.encoder = Encoder(encoder_input) # 初始化编码器实例 self.decoder = Decoder(decoder_input) # 初始化解码器实例# 定义线性投影层,将解码器输出转换为目标词汇表大小的概率分布self.projection = nn.Linear(d_embedding, vocab_size, bias=False)def forward(self, enc_inputs, dec_inputs):#------------------------- 维度信息 --------------------------------# enc_inputs 的维度是 [batch_size, source_seq_len]# dec_inputs 的维度是 [batch_size, target_seq_len]#----------------------------------------------------------------- # 将输入传递给编码器,并获取编码器输出和自注意力权重 enc_outputs, enc_self_attns = self.encoder(enc_inputs)#------------------------- 维度信息 --------------------------------# enc_outputs 的维度是 [batch_size, source_len, embedding_dim]# enc_self_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, src_seq_len, src_seq_len] #----------------------------------------------------------------- # 将编码器输出、解码器输入和编码器输入传递给解码器# 获取解码器输出、解码器自注意力权重和编码器 - 解码器注意力权重 dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#------------------------- 维度信息 --------------------------------# dec_outputs 的维度是 [batch_size, target_len, embedding_dim]# dec_self_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, tgt_seq_len, src_seq_len]# dec_enc_attns 是一个列表,每个元素的维度是 [batch_size, n_heads, tgt_seq_len, src_seq_len] #----------------------------------------------------------------- # 将解码器输出传递给投影层,生成目标词汇表大小的概率分布dec_logits = self.projection(dec_outputs) #------------------------- 维度信息 --------------------------------# dec_logits 的维度是 [batch_size, tgt_seq_len, tgt_vocab_size]#-----------------------------------------------------------------# 返回逻辑值 ( 原始预测结果 ), 编码器自注意力权重,解码器自注意力权重,解 - 编码器注意力权重return dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns
训练
transfomer=Transformer()
import torch # 导入 torch
import torch.optim as optim # 导入优化器
model = Transformer() # 创建模型实例
print(model)
criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = optim.Adam(model.parameters(), lr=0.00001) # 优化器
epochs = 40 # 训练轮次
for epoch in range(epochs): # 训练 100 轮optimizer.zero_grad() # 梯度清零
# enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size) # 创建训练数据
# print(enc_inputs, dec_inputs, target_batch)outputs, _, _, _ = model(encoder_input, decoder_input) # 获取模型输出 loss = criterion(outputs.view(-1, vocab_size), decoder_target.view(-1)) # 计算损失if (epoch + 1) % 1 == 0: # 打印损失print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")loss.backward()# 反向传播 optimizer.step()# 更新参数
预测
model.eval()
question_text = '张学友是哪里人'
question_cut = list(jieba.cut(question_text))
encoder_x = make_data([question_cut])
decoder_x = [[word2index['SOS']]]
encoder_x, decoder_x = torch.LongTensor(encoder_x), torch.LongTensor(decoder_x)# decoder_x=torch.tensor([[1, 0, 0, 0, 0]])decoder_x=torch.zeros(1,seq_length,dtype=torch.long)outt=1
for i in range(seq_length):decoder_x[0][i]=outtpredict, enc_self_attns, dec_self_attns, dec_enc_attns = model(encoder_x, decoder_x) # 用模型进行翻译predict = predict.view(-1,vocab_size) # 将预测结果维度重塑predict = predict.data.max(1, keepdim=True)[1] # 找到每个位置概率最大的词汇的索引outt=predict[i].item()predict, enc_self_attns, dec_self_attns, dec_enc_attns = model(encoder_x, decoder_x) # 用模型进行翻译
predictWords=predict.data.max(-1)answer = ''
for i in predictWords[1][0]:if i.item() in [2,0]:breakanswer += index2word[i.item()]
print('问题:', question_text)
print('回答:', answer)