有兴趣的同学可以在自己的电脑上跑跑看看实验结果
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from torchtext.datasets import AG_NEWS
from torchtext.data.utils import get_tokenizer
from collections import Counter
from torchtext.vocab import Vocabclass PositionalEncoding(nn.Module):def __init__(self, d_model, max_len=5000):super(PositionalEncoding, self).__init__()# 创建位置编码矩阵,形状为 (max_len, d_model)pe = torch.zeros(max_len, d_model)# 创建位置的张量 (0, 1, 2, ..., max_len-1) 并扩展其维度position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)# 计算正弦和余弦函数的除数项div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(np.log(10000.0) / d_model))# 对位置编码的偶数索引应用正弦函数,奇数索引应用余弦函数pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)# 添加一个额外的维度以便与批次兼容pe = pe.unsqueeze(0).transpose(0, 1)# 注册位置编码为缓冲区,在训练期间不更新self.register_buffer('pe', pe)def forward(self, x):# 将位置编码加到输入的嵌入上return x + self.pe[:x.size(0), :]class TransformerEncoderLayerCustom(nn.Module):def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):super(TransformerEncoderLayerCustom, self).__init__()# 自定义多头自注意力机制self.d_model = d_modelself.nhead = nheadself.dropout = nn.Dropout(dropout)# 前馈网络,包含两个线性层和一个激活函数(ReLU)self.linear1 = nn.Linear(d_model, dim_feedforward)self.linear2 = nn.Linear(dim_feedforward, d_model)# 层归一化self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)# 激活函数self.activation = F.reludef scaled_dot_product_attention(self, query, key, value, mask=None):# 计算注意力分数scores = torch.matmul(query, key.transpose(-2, -1)) / np.sqrt(self.d_model // self.nhead)# 应用掩码(如果有)if mask is not None:scores = scores.masked_fill(mask == 0, float('-inf'))# 计算注意力权重attn_weights = F.softmax(scores, dim=-1)# 应用注意力权重到值上return torch.matmul(attn_weights, value)def forward(self, src, src_mask=None):batch_size, seq_len, _ = src.size()head_dim = self.d_model // self.nhead# 将输入分割成多个头query = key = value = src.view(batch_size, seq_len, self.nhead, head_dim).transpose(1, 2)# 计算多头注意力attn_output = self.scaled_dot_product_attention(query, key, value, mask=src_mask)attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)attn_output = self.dropout(attn_output)# 残差连接和层归一化src = self.norm1(src + attn_output)# 前馈网络src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))# 残差连接和层归一化src = self.norm2(src + src2)return srcclass AutoregressiveTransformer(nn.Module):def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, dim_feedforward, max_len):super(AutoregressiveTransformer, self).__init__()# 嵌入层,将标记索引转换为稠密向量self.embedding = nn.Embedding(vocab_size, d_model)# 位置编码,用于将序列信息添加到嵌入中self.pos_encoder = PositionalEncoding(d_model, max_len)# 自定义的 Transformer 编码器层,使用指定的参数encoder_layers = TransformerEncoderLayerCustom(d_model, nhead, dim_feedforward)# 堆叠多个 Transformer 编码器层self.transformer_encoder = nn.ModuleList([encoder_layers for _ in range(num_encoder_layers)])self.d_model = d_model# 线性层,将编码器的输出投影到词汇表大小self.decoder = nn.Linear(d_model, vocab_size)def forward(self, src, src_mask):# 对源标记应用嵌入层,并按 sqrt(d_model) 进行缩放src = self.embedding(src) * np.sqrt(self.d_model)# 将位置编码加到嵌入后的标记上src = self.pos_encoder(src)# 通过所有的 Transformer 编码器层for layer in self.transformer_encoder:src = layer(src, src_mask)# 将输出投影到词汇表大小output = self.decoder(src)return outputdef generate_square_subsequent_mask(self, sz):# 生成一个掩码,以防模型关注未来的位置mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))return maskdef train(model, data, vocab_size, num_epochs=10, learning_rate=0.0005):# 定义损失函数为交叉熵损失criterion = nn.CrossEntropyLoss()# 使用 Adam 优化器进行训练optimizer = optim.Adam(model.parameters(), lr=learning_rate)for epoch in range(num_epochs):model.train() # 将模型设置为训练模式total_loss = 0for batch in data:# 输入序列为除最后一个标记外的所有标记src = batch[:-1]# 目标序列为除第一个标记外的所有标记(向右移动一个位置)tgt = batch[1:]# 为输入序列生成后续掩码src_mask = model.generate_square_subsequent_mask(len(src)).to(src.device)optimizer.zero_grad() # 在反向传播前将梯度归零# 通过模型进行前向传播output = model(src, src_mask)# 计算模型输出与目标序列之间的损失loss = criterion(output.view(-1, vocab_size), tgt.view(-1))# 反向传播损失loss.backward()# 更新模型参数optimizer.step()total_loss += loss.item()# 打印每个 epoch 的平均损失print(f"Epoch {epoch+1}, Loss: {total_loss / len(data)}")def predict(model, start_token, max_len, vocab_size):model.eval() # 将模型设置为评估模式generated_sequence = [start_token] # 使用起始标记初始化生成的序列# 使用起始标记创建初始输入张量src = torch.tensor([start_token]).unsqueeze(1) # 形状为 (seq_len, batch_size)for _ in range(max_len - 1):# 为当前输入序列生成掩码src_mask = model.generate_square_subsequent_mask(len(src)).to(src.device)# 通过模型进行前向传播output = model(src, src_mask)# 获取具有最高概率的标记作为下一个标记next_token = torch.argmax(output[-1, 0, :], dim=-1).item()# 将预测的标记添加到生成的序列中generated_sequence.append(next_token)# 通过添加新标记更新输入序列src = torch.cat([src, torch.tensor([[next_token]])], dim=0)return generated_sequence# 数据处理部分
train_iter = AG_NEWS(split='train')
tokenizer = get_tokenizer('basic_english')
counter = Counter()
for (label, line) in train_iter:counter.update(tokenizer(line))
vocab = Vocab(counter, specials=['<unk>', '<pad>'])# 超参数
vocab_size = len(vocab) # 使用 AG_NEWS 数据集的词汇表大小
d_model = 32 # 嵌入向量的维度
nhead = 2 # 注意力头的数量
num_encoder_layers = 2 # Transformer 编码器层的数量
dim_feedforward = 64 # Transformer 中前馈网络的维度
max_len = 20 # 输入序列的最大长度# 实例化模型
model = AutoregressiveTransformer(vocab_size, d_model, nhead, num_encoder_layers, dim_feedforward, max_len)# 生成训练数据
train_iter = AG_NEWS(split='train')
data = []
for (label, line) in train_iter:tokens = [vocab[token] for token in tokenizer(line)]if len(tokens) >= max_len:tokens = tokens[:max_len]else:tokens = tokens + [vocab['<pad>']] * (max_len - len(tokens))data.append(torch.tensor(tokens))if len(data) >= 100: # 使用 100 条样本进行演示break# 训练模型
train(model, data, vocab_size, num_epochs=10)# 预测序列
start_token = vocab['<unk>'] # 序列生成的起始标记
generated_sequence = predict(model, start_token, max_len, vocab_size)
print("Generated Sequence:", generated_sequence)