本文目录
- 前述
- 一、环境依赖
- 二、数据准备
- 1. 数据加载
- 程序解析
- word_tokenize()将字符串分割为一个个的单词,并由列表保存。
- 2. 构建单词表
- 程序解析
- (1)将列表里每个子列表的所有单词合并到一个新列表(没有子列表)中。
- (2)Counter()-- 统计列表(迭代对象)各元素出现次数,并按次数从多到少排序。
- (3)获取出现频率最高的前 50000 个元素及其个数,返回一个含有多个元组的列表。
- (4) 将含有多个元组的列表,转为字典。
- (5) 将字典的元素和索引互换位置。
- 3. 将英文、中文单词列表转为单词索引列表
- 程序解析
- (1)将en句子中的各个单词在字典里的索引,组成一个索引列表。
- (2)将句子的索引列表按照句子长度进行排序--从短到长。
- 4. 获取所有语句中的最大长度,如果语句小于该长度则填充0。
- 5. 将数据按照batch进行划分
- 三、模型搭建
前述
基础请查看:Transformer基础查看地址!
一、环境依赖
nltk==3.5
numpy==1.18.5
seaborn==0.11.1
matplotlib==3.3.2
psyco==1.6
zhtools==0.0.5#torch==1.12.1 安装torch时使用下面的命令
pip install torch==1.12.1+cu113 torchvision==0.13.1+cu113 torchaudio==0.12.1 --extra-index-url https://download.pytorch.org/whl/cu113 -i https://pypi.tuna.tsinghua.edu.cn/simple
代码导入包 :
import copy
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import seaborn as sns
import time
import torch
import torch.nn as nn
import torch.nn.functional as Ffrom collections import Counter
from langconv import Converter
from nltk import word_tokenize
from torch.autograd import Variable
二、数据准备
数据集可以去网络上下载,下面的是train.txt文件部分内容,前面为英文,后面为繁体中文,中间以'\t'
隔开。其他数据文件也相同。
这里数据集是英文和繁体中文,所以第一步我们需要将繁体中文变为简体中文。
繁体字转简体字转换代码如下:
from langconv import Convertersentences="愛車"def cht_to_chs(sentences):
"""" zh-hans" 是一个语言代码,用于指代中文(汉语)的简体字形式。在国际化和本地化领域,语言代码用于标识特定语言或语言变体。在这里,"zh" 表示汉语(中文),"hans" 表示简体字形式。因此,"zh-hans" 表示简体中文。"""sentences= Converter("zh-hans").convert(sentences) sentences= sent.encode("utf-8")return sentencesprint(cht_to_chs(sentences))
1. 数据加载
作用:读取数据路径下的完整句子,将每个句子分割为一个一个的单词,并存到子列表中。返回含有子列表的列表,
"""参数参数path 为数据的路径,如下train_file= 'nmt/en-cn/train.txt' # 训练集dev_file= "nmt/en-cn/dev.txt" # 验证集load_data(train_file)
"""def load_data(path):"""读取英文、中文数据对每条样本分词并构建包含起始符和终止符的单词列表"""en = [] #定义英文列表cn = [] #定义中文列表with open(path, mode="r", encoding="utf-8") as f: #只读的形式打开文件路径,文件描述符为f。for line in f.readlines(): #按行读取sent_en, sent_cn = line.strip().split("\t") #以‘\t’进行分割,前面的赋给sent_en,后面的赋给sent_cn 。sent_en = sent_en.lower() #将英文转换为小写。sent_cn = cht_to_chs(sent_cn) #将繁体中文转为简体中文。sent_en = ["BOS"] + word_tokenize(sent_en) + ["EOS"]# 中文按字符切分sent_cn = ["BOS"] + [char for char in sent_cn] + ["EOS"]en.append(sent_en) #将切割好的英文 存入英文列表。包含['BOS', 'i', 'love', 'you', 'EOS']cn.append(sent_cn) #将切割好的中文 存入中文列表。return en, cn #返回两个单词列表"""
输出列表格式如下:en = [['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']]
"""
程序解析
word_tokenize()将字符串分割为一个个的单词,并由列表保存。
from nltk import word_tokenizesent_en="He knows better than to marry her."
print(word_tokenize(sent_en))# 输出为:['He', 'knows', 'better', 'than', 'to', 'marry', 'her', '.']
2. 构建单词表
"""输入sentences= [['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']]
"""PAD = 0 # padding占位符的索引
UNK = 1 # 未登录词标识符的索引
def build_dict(sentences, max_words=5e4):"""构造分词后的列表数据构建单词-索引映射(key为单词,value为id值)"""# 统计数据集中单词词频word_count = Counter([word for sent in sentences for word in sent])# 按词频保留前max_words个单词构建词典# 添加UNK和PAD两个单词ls = word_count.most_common(int(max_words))total_words = len(ls) + 2word_dict = {word [0]: index + 2 for index, word in enumerate(ls)}word_dict['UNK'] = UNKword_dict['PAD'] = PAD# 构建id2word映射index_dict = {v: k for k, v in word_dict.items()}return word_dict, total_words, index_dict"""
输出:word_dict ={'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}index_dict= {2: 'BOS', 3: 'language', 4: 'processing', 5: '.', 6: 'EOS', 7: 'I', 8: 'love', 9: 'natural', 10: 'Natural', 11: 'is', 12: 'fascinating', 1: 'UNK', 0: 'PAD'}
"""
程序解析
(1)将列表里每个子列表的所有单词合并到一个新列表(没有子列表)中。
将sentences里面每句话的每个单词组合形成一个新的列表。
sentences = [['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']]word_list = [word for sent in sentences for word in sent]
"""
另一种写法:word_list = []for sent in sentences:for word in sent:word_list.append(word)
"""
print(word_list )
"""输出: ['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS', 'BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
"""
(2)Counter()-- 统计列表(迭代对象)各元素出现次数,并按次数从多到少排序。
from collections import Counter
#Python 中的一个内置数据结构
# 定义一个列表
word_list = ['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS', 'BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']
# 使用 Counter 统计列表中各元素的出现次数
word_count = Counter(word_list)
print(word_count )"""输出: Counter({'BOS': 2, 'language': 2, 'processing': 2, '.': 2, 'EOS': 2, 'I': 1, 'love': 1, 'natural': 1, 'Natural': 1, 'is': 1, 'fascinating': 1})
"""
(3)获取出现频率最高的前 50000 个元素及其个数,返回一个含有多个元组的列表。
如: [ (元素1,频次),(元素2,频次),… ]
from collections import Counterword_count = Counter({'BOS': 2, 'language': 2, 'processing': 2, '.': 2, 'EOS': 2, 'I': 1, 'love': 1, 'natural': 1, 'Natural': 1, 'is': 1, 'fascinating': 1})ls = word_count.most_common(int(5e4))#返回列表中频率最高的元素和它们的计数,按照计数从高到低排序。频率最高的前 50000 个元素。
print(ls)
"""
输出:[('BOS', 2), ('language', 2), ('processing', 2), ('.', 2), ('EOS', 2), ('I', 1), ('love', 1), ('natural', 1), ('Natural', 1), ('is', 1), ('fascinating', 1)]"""
(4) 将含有多个元组的列表,转为字典。
如:word_dict ={元素1:索引,元素2,索引…}
enumerate(可迭代元素),返回的第一个值为索引,第二个值为元素。
ls = [('BOS', 2), ('language', 2), ('processing', 2), ('.', 2), ('EOS', 2), ('I', 1), ('love', 1), ('natural', 1), ('Natural', 1), ('is', 1), ('fascinating', 1)]word_dict = {word [0]: index + 2 for index, word in enumerate(ls)}
# word是('BOS',2), word[0]='BOS' , word[1]=2
#index 是该元组的索引,也就是 0
"""另一种写法:word_dict = {}for index, word in enumerate(ls):word_dict[ word[0] ] = index + 2print(word_dict)
"""
print(word_dict) #存放元素及其索引号
"""输出: {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12}
"""word_dict['UNK'] = 1
word_dict['PAD'] = 0
print(word_dict)
"""输出:{'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}
"""
(5) 将字典的元素和索引互换位置。
word_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0}index_dict = {v: k for k, v in word_dict.items()}
print(index_dict)"""输出:{2: 'BOS', 3: 'language', 4: 'processing', 5: '.', 6: 'EOS', 7: 'I', 8: 'love', 9: 'natural', 10: 'Natural', 11: 'is', 12: 'fascinating', 1: 'UNK', 0: 'PAD'}
"""
3. 将英文、中文单词列表转为单词索引列表
"""
输入:en = [['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']]
cn= 同上格式相同en_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0 }cn_dict= 同上格式相同
"""def word2id(en, cn, en_dict, cn_dict, sort=True):"""将英文、中文单词列表转为单词索引列表`sort=True`表示以英文语句长度排序,以便按批次填充时,同批次语句填充尽量少"""length = len(en) #计算长度# 单词映射为索引out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]out_cn_ids = [[cn_dict.get(word, UNK) for word in sent] for sent in cn]# 按照语句长度排序def len_argsort(seq):"""传入一系列语句数据(分好词的列表形式),按照语句长度排序后,返回排序后原来各语句在数据中的索引下标"""return sorted(range(len(seq)), key=lambda x: len(seq[x]))# 按相同顺序对中文、英文样本排序if sort:# 以英文语句长度排序sorted_index = len_argsort(out_en_ids)out_en_ids = [out_en_ids[idx] for idx in sorted_index]out_cn_ids = [out_cn_ids[idx] for idx in sorted_index]return out_en_ids, out_cn_ids"""
输出:
out_en_ids= [ [2, 7, 8, 9, 3, 4, 5, 6],[2, 10, 3, 4, 11, 12, 5, 6] ]
out_cn_ids= 同上格式相同
"""
程序解析
(1)将en句子中的各个单词在字典里的索引,组成一个索引列表。
from nltk import word_tokenizePAD = 0 # padding占位符的索引
UNK = 1 # 未登录词标识符的索引en = [['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS', 'Natural', 'language', 'processing', 'is', 'fascinating', '.', 'EOS']]
en_dict= {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'Natural': 10, 'is': 11, 'fascinating': 12, 'UNK': 1, 'PAD': 0 }out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]
"""
另一种写法:out_en_ids = []for sent in en:# 将句子中的每个单词转换为对应的单词ID# 如果单词不在字典中,则使用UNK表示word_ids = []for word in sent:word_id = en_dict.get(word, UNK)word_ids.append(word_id)out_en_ids.append(word_ids)
"""
print(out_en_ids)
"""输出: [ [2, 7, 8, 9, 3, 4, 5, 6],[2, 10, 3, 4, 11, 12, 5, 6] ]
2:en第一个句子的第一个单词'BOS'在en_dict字典的索引值为2.
7:en第一个句子的第二个单词'I'在en_dict字典的索引值为7.
8:en第一个句子的第三个单词'love'在en_dict字典的索引值为8.
......
"""
(2)将句子的索引列表按照句子长度进行排序–从短到长。
作用: 以便后续分batch做padding时,同批次各句子需要padding的长度相近减少padding量。
from nltk import word_tokenizeUNK=1
en = [['BOS', 'I', 'love', 'natural', 'language', 'processing', '.', 'EOS'],['BOS', 'This', 'is', 'a', 'test', 'sentence', '.', 'EOS','I','love'],['BOS', 'test', '.', 'EOS']
]
en_dict = {'BOS': 2, 'language': 3, 'processing': 4, '.': 5, 'EOS': 6, 'I': 7, 'love': 8, 'natural': 9, 'This': 10, 'is': 11, 'a': 12, 'test': 13, 'sentence': 14, 'UNK': 1, 'PAD': 0}def word2id(en, en_dict, sort=True):length = len(en) # 计算长度out_en_ids = [[en_dict.get(word, UNK) for word in sent] for sent in en]"""out_en_ids = [ [2, 7, 8, 9, 3, 4, 5, 6],[2, 10, 3, 4, 11, 12, 5, 6] ]"""# 按照语句长度排序def len_argsort(seq):return sorted(range(len(seq)), key=lambda x: len(seq[x]))if sort:sorted_index = len_argsort(out_en_ids) #获得按句子长度排序后的索引号,如sorted_index =[2, 0, 1]out_en_ids = [out_en_ids[idx] for idx in sorted_index] #按照排序后的索引列表的索引,将索引列表排序。return out_en_idsprint(word2id(en,en_dict))"""
sort=False 索引列表按en句子的顺序排列out_en_ids= [[2, 7, 8, 9, 3, 4, 5, 6], [2, 10, 11, 12, 13, 14, 5, 6, 7, 8], [2, 13, 5, 6] ]sort=True 索引列表按en句子的长度排列,从短到长排列out_en_ids= [[2, 13, 5, 6], [2, 7, 8, 9, 3, 4, 5, 6], [2, 10, 11, 12, 13, 14, 5, 6, 7, 8] ]
"""
4. 获取所有语句中的最大长度,如果语句小于该长度则填充0。
"""
输入:sentences= [['BOS','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS', 'Natural', 'language']]"""PAD=0 #填充元素
def seq_padding(sentences, padding=PAD):"""按批次(batch)对数据填充、长度对齐"""# 计算该批次各条样本语句长度Length = [len(x) for x in sentences]# 获取该批次样本中语句长度最大值MaxLength= max(Length )# 遍历该批次样本,如果语句长度小于最大长度,则用padding填充return np.array([np.concatenate([x, [padding] * (MaxLength - len(x))]) if len(x) < MaxLength else x for x in sentences])"""---------------------写法二: """
def seq_padding(sentences, padding=PAD):"""按批次(batch)对数据填充、长度对齐"""# 计算该批次各条样本语句长度Length = [len(x) for x in X]# 获取该批次样本中语句长度最大值MaxLength = max(Length)# 遍历该批次样本,如果语句长度小于最大长度,则用padding填充padded_X = []for x in X:if len(x) < MaxLength:padded_seq = np.concatenate([x, [padding] * (ML - len(x))])else:padded_seq = xpadded_X.append(padded_seq)return np.array(padded_X)print(seq_padding(sentences))
"""
输出:[['BOS' 'I' 'love' 'natural' 'language' 'processing' '.' 'EOS']['BOS' 'Natural' 'language' '0' '0' '0' '0' '0']]
"""
5. 将数据按照batch进行划分
"""假设输入en = [['BOS0','I', 'love', 'natural', 'language', 'processing', '.', 'EOS'] ,['BOS1', 'Natural'],['BOS2', 'Natural','wd','a2rra'],['BOS3', 'Natural'],['BOS4', 'Natural'],['BOS5', 'Natural','afw'],['BOS6', 'Natural'],['BOS7', 'Natural','wda','aw4'],['BOS8', 'Natural'],['BOS9', 'Natural']]batch_size=3shuffle=True ,是否随机打乱顺序"""
def split_batch(en, cn, batch_size, shuffle=True):"""划分批次`shuffle=True`表示对各批次顺序随机打乱"""# 每隔batch_size取一个索引作为后续batch的起始索引idx_list = np.arange(0, len(en), batch_size) #idx_list = [0,3,6,9]# 起始索引随机打乱if shuffle:np.random.shuffle(idx_list) #idx_list = [6,3,0,9] 随机打乱顺序# 存放所有批次的语句索引batch_indexs = []for idx in idx_list: #idx_list = [6,3,0,9] # 起始索引最大的批次可能发生越界,要限定其索引batch_indexs.append(np.arange( idx, min(idx + batch_size, len(en)) ))"""batch_indexs=[array([6, 7, 8]), array([3, 4, 5]), array([0, 1, 2]), array([9])]""" # 构建批次列表batches = []for batch_index in batch_indexs:# 按当前批次的样本索引采样batch_en = [en[index] for index in batch_index]batch_cn = [cn[index] for index in batch_index]
"""第一个 batch_en = [ ['BOS6', 'Natural'], ['BOS7', 'Natural','wda','aw4'], ['BOS8', 'Natural'] ]第二个 batch_en = [ ['BOS3', 'Natural'], ['BOS4', 'Natural'],['BOS5', 'Natural','afw'] ]
"""# 对当前批次中所有语句填充、对齐长度# 维度为:batch_size * 当前批次中语句的最大长度batch_cn = seq_padding(batch_cn)batch_en = seq_padding(batch_en)
"""
按照批次中最长语句的长度进行填充第一个 batch_en = [ ['BOS6' 'Natural' '0' '0'], ['BOS7' 'Natural' 'wda' 'aw4'], ['BOS8' 'Natural' '0' '0'] ]第二个 batch_en = [ ['BOS3' 'Natural' '0'], ['BOS4' 'Natural' '0'], ['BOS5' 'Natural' 'afw'] ]
""" # 将当前批次添加到批次列表# Batch类用于实现注意力掩码batches.append(Batch(batch_en, batch_cn))return batches
class Batch:"""批次类1. 输入序列(源)2. 输出序列(目标)3. 构造掩码"""def __init__(self, src, trg=None, pad=PAD):# 将输入、输出单词id表示的数据规范成整数类型src = torch.from_numpy(src).to(DEVICE).long()trg = torch.from_numpy(trg).to(DEVICE).long()self.src = src# 对于当前输入的语句非空部分进行判断,bool序列# 并在seq length前面增加一维,形成维度为 1×seq length 的矩阵self.src_mask = (src != pad).unsqueeze(-2)# 如果输出目标不为空,则需要对解码器使用的目标语句进行掩码if trg is not None:# 解码器使用的目标输入部分self.trg = trg[:, : -1] # 去除最后一列# 解码器训练时应预测输出的目标结果self.trg_y = trg[:, 1:] #去除第一列的SOS# 将目标输入部分进行注意力掩码self.trg_mask = self.make_std_mask(self.trg, pad)# 将应输出的目标结果中实际的词数进行统计self.ntokens = (self.trg_y != pad).data.sum()# 掩码操作@staticmethoddef make_std_mask(tgt, pad):"Create a mask to hide padding and future words."tgt_mask = (tgt != pad).unsqueeze(-2)tgt_mask = tgt_mask & Variable(subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data))return tgt_mask