PET项目——新零售决策评价系统(上)
- 0 前言
- 1 项目介绍
- 1.1 PET简介
- 1.2 项目背景
- 1.3 项目结构
- 1.4 硬件配置
- 2 数据处理
- 2.1 数据介绍
- 2.2 提示词模板与标签映射
- 2.3 BERT模型的输入格式
- 2.4 硬模板类
- 2.5 函数式编程
- 2.6 datasets模块
- 主要功能:
- 在本项目中的使用:
- 2.7 数据集加载与批处理
- 2.8 数据集导入器
- 3 模型配置与推理
- 3.1 配置文件
- 3.2 模型推理
- 3.3 convert_logits_to_ids函数
- 3.4 标签映射类(第一部分)
- 4 模型训练(微调)
- 4.1 标签映射类(第二部分)
- 4.2 损失函数
- 4.3 模型的评估指标类
- 4.4 模型微调
0 前言
上篇文章我们介绍了使用全量微调构建医疗问诊机器人,比较常用的微调还有提示词微调(Prompt微调)、PEFT,而Prompt微调用的比较多有PET和P-Tuning,而我们今天介绍的项目,就是使用PET的方式微调大模型。
1 项目介绍
1.1 PET简介
PET的全称是Pattern-Exploiting Training,主要用来做文本分类(Prompt微调基本都是文本分两类),它是通过构建提示词模板(硬模版,需要我们自己指定模板)的方式,将下游任务转化为一个完形填空任务,这样就可以用BERT的MLM模型来进行预测了。
1.2 项目背景
智能推荐系统是AI技术在新零售中 最为常见且有效的应用之一,通过分析用户的购买历史、浏览行为以及喜好偏好,推荐系统可以根据个人特征给用户进行个性化商品推荐。这种个性化推荐不仅可以提高用户购买意愿,减少信息过载,还可以带来更高的用户满意度和销量。
在智能推荐系统中,文本分类的应用属于重要的应用环节。比如:某电商网站都允许用户为商品填写评论,这些文本评论能够体现出用户的偏好以及商品特征信息。因此,本次项目我们将以“电商平台用户评论”为背景,基于深度学习方法实现评论文本的准确分类,这样做的目的是通过用户对不同商品或服务的评价,平台能够快速回应用户需求,改进产品和服务。同时,自动分类也为个性化推荐奠定基础,帮助用户更轻松地找到符合其偏好的商品。
本项目要做的就是对用户的评论进行分类,当输入用户的评论后,输出这是对什么商品的评论,比如输入为“包装不错,苹果挺甜的,个头也大。”,输出“苹果”。
本项目我们使用BERT模型作为基础模型。
1.3 项目结构
我们把代码和预训练模型放到两个不同的目录中
其中代码在PET目录下面,结构如下:
1.4 硬件配置
硬件我们使用FunHPC云算力市场上的RTX 3080显卡,显存为12G,关于FunHPC云算力的使用,可以参考这篇文章。
2 数据处理
个人认为,本项目最难、最繁琐的部分就是数据处理了,特别是之前没有开发过和BERT相关的项目。这里需要铺垫的知识有点多,我尽量在用到之前,对其进行讲解。
2.1 数据介绍
训练集和验证集分别在data目录的train.txt与dev.txt中,其中 train.txt 内容如下:
train.txt一共包含63条样本数据,dev.txt一共包含590条样本数据,每一行用\t
分开,前半部分为标签(label),后半部分为原始输入 (用户评论),如果想使用自定义数据训练,只需要仿照上述示例数据构建数据集即可。
训练集数据少是由prompt微调的思想决定的,微调的时候更多地是使用预训练模型地能力,验证集数据多是为了保证评估结果的可信度,因此需要比较多的数据对模型进行充分评估。
2.2 提示词模板与标签映射
data目录下的prompt.txt中保存的是提示词模板,内容如下:
这是一条{MASK}评论:{textA}。
data目录下的verbalizer.txt中保存的是主标签与子标签的对应关系,内容如下:
电脑 电脑
水果 水果,苹果,香蕉,西瓜,西瓜
平板 平板
衣服 衣服,裙子,西装
酒店 酒店
洗浴 洗浴
书籍 书籍
蒙牛 蒙牛
手机 手机
电器 电器
这里第一列是主标签,第二列是子标签,我们的项目中,主标签和子标签是一致的。但很多情况下,两者是不一致的。
例如,以新闻分类为例:
某条新闻的标题为:中国女排再夺冠!
我们构建的PET模板为:下面是[MASK][MASK]
新闻:textA
,Label:体育/财经/时政/军事。
如果使用标签去填充,那么构建的提示词为:下面是[MASK][MASK]
新闻:中国女排再夺冠!
如果让模型去预测[MASK][MASK]
遮盖的内容,那么很可能得到排球、女排、体育等词汇,它有可能不在我们要预测的范围内(体育/财经/时政/军事),所以需要对标签进行映射,将排球、女排、体育都映射成体育。
训练时,以子标签作为目标值,损失函数的计算也是通过对比预测值与子标签的差距来进行;计算各种评估指标时(例如准确率、召回率等)则是对比主标签(即把预测值转换为主标签,然后计算)。
2.3 BERT模型的输入格式
如果不了解BERT模型的输入格式,那么很难理解本项目的数据处理过程。
BERT(Bidirectional Encoder Representations from Transformers)模型的输入通常包含以下几个部分,这些部分共同构成了一个完整的输入格式:
-
input_ids
:- 含义:这是输入文本经过分词器(如
AutoTokenizer
)编码后的 token ID 列表。 - 作用:用于表示输入文本中的每个 token(单词或子词)。BERT 模型通过这些 IDs 来查找对应的词向量。
- 含义:这是输入文本经过分词器(如
-
attention_mask
:- 含义:这是一个与
input_ids
长度相同的二进制列表,用于标识哪些位置是有效的 token(值为 1),哪些位置是填充的 token(值为 0)。 - 作用:帮助模型忽略填充的部分,只关注实际的输入内容。这对于处理不同长度的输入序列非常重要。
- 含义:这是一个与
-
token_type_ids
:- 含义:这是一个与
input_ids
长度相同的整数列表,用于区分不同的句子或文本片段。BERT 支持处理多句输入(例如:[CLS] 句子A [SEP] 句子B [SEP]
),此时需要通过token_type_ids
来标识每个 token 属于哪个句子。 - 作用:在多句输入的情况下,帮助模型理解哪些 token 属于哪个句子。例如,在问答任务中,
token_type_ids
可以区分问题和上下文。 - 如果输入是单句
"这是一条[MASK]评论:这个手机也太卡了"
,那么token_type_ids
将全部为 0。 - 如果输入是两个句子
[CLS]句子A[SEP]句子B[SEP]
,那么token_type_ids
的前半部分为 0,后半部分为 1。
- 含义:这是一个与
-
position_ids
(可选):- 含义:这是每个 token 在输入序列中的位置索引。
- 例如"这是一条
[MASK]
评论:这个手机也太卡了",假设经过分词器处理后,输入文本被编码为以下 token 列表(包括特殊token [CLS]
和[SEP]
),对应的position_ids
就是这些 token 在输入序列中的位置索引:
input_tokens = ['[CLS]', '这', '是', '一', '条', '[MASK]', '评', '论', ':', '这', '个', '手', '机', '也', '太', '卡', '了', '[SEP]']
position_ids = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
mask_position
(特定任务使用):- 含义:这是一个标记
[MASK]
位置的列表,用于指示哪些位置是被遮盖的 token。 - 作用:在 MLM(Masked Language Model)任务中,模型需要知道哪些位置是被遮盖的,以便进行预测。
[MASK]
出现在第 6 个位置(从 0 开始计数),因此 mask_position 为:
- 含义:这是一个标记
mask_position = [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
对于大多数 BERT 应用场景,输入至少需要包含 input_ids
和 attention_mask
。如果涉及多句输入,则还需要 token_type_ids
,而 mask_position
则是特定任务(如 MLM)中使用的辅助信息。我们当前的分类任务,就是被 [MASK]
遮盖的商品类别,所以是MLM任务,需要mask_position
。
2.4 硬模板类
所谓硬模板,指的是人工设计的模板,或者说开发人员/产品经理设计的模板,除了硬模板,还有软模板,软模版是大模型生成的模板,关于软模版,我们这里不展开,后面的文章会介绍。
data_handle目录下的template.py中写了一个名为HardTemplate的类,它有两个功能:一是将样本数据填充到模板里面,构成提示词;二是将提示词按照BERT模型的输入格式进行转化。
template.py的内容如下:
# -*- coding:utf-8 -*-
from transformers import AutoTokenizer
import numpy as npclass HardTemplate(object):"""硬模板,人工定义句子和[MASK]之间的位置关系。"""def __init__(self, prompt: str):"""Args:prompt (str): prompt格式定义字符串,它是提示词模板e.g. -> "这是一条{MASK}评论:{textA}。",两个花括号中的内容是自定义字段,目前用 MASK 和 textA 进行占位"""self.prompt = promptself.inputs_list = [] # 用于填充prompt的输入列表,获取模板分词后的结果self.custom_tokens = set(['MASK']) # 从prompt中解析出的自定义token,比如'MASK'和'textA',即花括号中的内容self.prompt_analysis() # 解析prompt模板def prompt_analysis(self):"""将prompt文字模板拆解为可映射的数据结构。Examples:prompt -> "这是一条{MASK}评论:{textA}。"该函数执行后inputs_list -> ['这', '是', '一', '条', 'MASK', '评', '论', ':', 'textA', '。']custom_tokens -> {'textA', 'MASK'},这是一个集合"""idx = 0while idx < len(self.prompt):str_part = ''if self.prompt[idx] not in ['{', '}']:self.inputs_list.append(self.prompt[idx])if self.prompt[idx] == '{': # 进入自定义字段idx += 1while self.prompt[idx] != '}': # 右边的花括号是在这个分支中遍历完str_part += self.prompt[idx] # 拼接该自定义字段的值idx += 1elif self.prompt[idx] == '}': # 右边花括号如果出现在这个分支中,则说明花括号不匹配,报错raise ValueError("Unmatched bracket '}', check your prompt.")if str_part:self.inputs_list.append(str_part)self.custom_tokens.add(str_part) # 将所有自定义字段存储,后续会检测输入信息是否完整idx += 1def __call__(self,inputs_dict: dict,tokenizer,mask_length,max_seq_len=512):"""输入一个样本,将其填充到硬模板当中,并将其转化为Bert模型的输入格式Bert模型的输入包含 input_ids、attention_mask、token_type_ids、position_ids(可选)、mask_position(特定任务使用)Args:inputs_dict (dict): prompt中的参数字典, e.g. -> {"textA": "这个手机也太卡了", "MASK": "[MASK]"}tokenizer: 用于encoding文本mask_length (int): MASK token 的长度,即掩码的字符长度Returns:dict -> {'text': '[CLS]这是一条[MASK]评论:这个手机也太卡了。[SEP]','input_ids': [1, 47, 10, 7, 304, 3, 480, 279, 74, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2],'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], # 填充后的提示词,哪些位置是需要注意的,对于不需要注意的,即用[PAD]填充的,则置0'mask_position': [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # 在[MASK]填充的位置置1}"""# 定义输出格式(Bert模型的接收格式)outputs = {'text': '', 'input_ids': [],'token_type_ids': [],'attention_mask': [],'mask_position': []}# 构建输入到模型的字符串,即将样本信息填充到模板str_formated = ''for value in self.inputs_list:if value in self.custom_tokens:if value == 'MASK':# 用inputs_dict中,'MASK'键对应的值(本项目对应的值为'[MASK]'),替换模板中的'MASK'字段# 并且根据mask_length,将MASK字段重复mask_length次,拼接起来str_formated += inputs_dict[value] * mask_lengthelse:# 这个分支中,value == 'textA',即将样本信息填充到模板中str_formated += inputs_dict[value]else:str_formated += value# print(f'str_formated-->{str_formated}')# 将填充后的字符串,用 tokenizer 进行编码,得到模型的输入格式encoded = tokenizer(text=str_formated,truncation=True,max_length=max_seq_len,padding='max_length')# print(f'encoded--->{encoded}')outputs['input_ids'] = encoded['input_ids'] # encoded['input_ids']有起始符、分隔符和填充符,分词器给加上的outputs['token_type_ids'] = encoded['token_type_ids']outputs['attention_mask'] = encoded['attention_mask']# 将input_ids从词表中的索引转化为对应的token,方便阅读outputs['text'] = ''.join(tokenizer.convert_ids_to_tokens(encoded['input_ids']))# 将'[MASK]'转化为词表中的索引,方便后续使用mask_token_id = tokenizer.convert_tokens_to_ids(['[MASK]'])[0]# 获取mask_position,即[MASK]的位置mask_position = np.where(np.array(outputs['input_ids']) == mask_token_id)[0].tolist()outputs['mask_position'] = mask_positionreturn outputsif __name__ == '__main__':# 创建分词器tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')# print(tokenizer.convert_ids_to_tokens([3819, 3352, 3819, 3352]))# print(tokenizer.convert_tokens_to_ids(['网', '球']))# 创建硬模板对象hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')# print(hard_template.inputs_list)# print(hard_template.custom_tokens)# 往硬模板内填充内容tep = hard_template(inputs_dict={'textA': '包装不错,苹果挺甜的,个头也大。', 'MASK': '[MASK]'},tokenizer=tokenizer,max_seq_len=30,mask_length=2)# 打印for key, value in tep.items():print(f'{key}: {value}')
对BERT模型的输入格式有所了解后,这个脚本的代码理解起来就比较容易。输出:
text: [CLS]这是一条[MASK][MASK]评论:包装不错,苹果挺甜的,个头也大。[SEP][PAD][PAD][PAD]
input_ids: [101, 6821, 3221, 671, 3340, 103, 103, 6397, 6389, 8038, 1259, 6163, 679, 7231, 8024, 5741, 3362, 2923, 4494, 4638, 8024, 702, 1928, 738, 1920, 511, 102, 0, 0, 0]
token_type_ids: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
attention_mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
mask_position: [5, 6]
2.5 函数式编程
按理说,函数式编程和数据处理关系并不大,但data_handle目录下的data_preprocess.py用到了,所以这里讲一下。
functools.partial
是 Python 标准库 functools
模块中的一个工具,用于部分应用函数(partial function application),它允许你固定某些参数的值,从而创建一个新的函数。
以下是 functools.partial
的用法和特点:
- 基本语法
from functools import partialnew_function = partial(function, *args, **kwargs)
function
: 要部分应用的原始函数。*args
: 固定的位置参数。**kwargs
: 固定的关键字参数。new_function
: 返回的新函数,其中部分参数已经被固定。
通过 partial
,你可以将一个函数的部分参数预先绑定,生成一个新的函数。新函数在调用时只需传递剩余的参数即可。
- 示例1:固定位置参数
from functools import partialdef add(x, y):return x + y# 创建一个新函数,固定第一个参数为5
add_five = partial(add, 5)print(add_five(10)) # 输出: 15
- 示例2:固定关键字参数
from functools import partialdef greet(name, greeting="Hello"):return f"{greeting}, {name}!"# 创建一个新函数,固定greeting为"Hi"
hi_greet = partial(greet, greeting="Hi")print(hi_greet("Alice")) # 输出: Hi, Alice!
- 示例 3:结合位置参数和关键字参数
from functools import partialdef multiply(x, y, z):return x * y * z# 创建一个新函数,固定x=2, y=3
multiply_by_2_and_3 = partial(multiply, 2, 3)print(multiply_by_2_and_3(4)) # 输出: 24
2.6 datasets模块
同样的,data_handle目录下的data_preprocess.py也用到了datasets模块,它是 Hugging Face 提供的一个用于处理机器学习和深度学习任务中数据集的 Python 库。它简化了从各种来源加载、预处理和迭代数据的过程,支持多种格式的数据集,并且与 Hugging Face 的 transformers
库无缝集成。
安装命令:pip install datasets
主要功能:
-
加载数据集:
- 可以从本地文件或远程资源加载数据集。
- 支持多种格式,如 CSV、JSON、文本文件等。
- 内置了许多常用数据集,可以直接通过名称加载。
-
数据预处理:
- 提供了丰富的 API 来对数据进行预处理,如分词、编码、填充等。
- 支持批量处理(batched),可以显著提高处理效率。
new_dataset = dataset.map(function, batched=False, batch_size=None, num_proc=1, ...)
- 参数说明如下:
- function: 要应用到每个样本上的函数。该函数应该接受一个字典作为输入,并返回一个字典。
- batched (bool): 是否以批次的方式处理数据。如果为 True,则 function 将接收一批样本(即包含多个样本的字典),而不是单个样本。
- batch_size (int): 如果 batched=True,则指定每批次的样本数量。默认情况下,batch_size=1000。
- num_proc (int): 并行处理的进程数。设置为大于 1 的值可以加速处理过程。
- 其他参数: 还有其他一些参数,如 remove_columns、load_from_cache_file 等,可以根据需要使用。
-
数据迭代(了解即可):
- 提供了灵活的迭代器,方便在训练过程中逐批次获取数据。
- 支持分布式训练中的数据分片。
-
数据增强(了解即可):
- 可以通过自定义函数对数据进行增强或转换。
在本项目中的使用:
在本项目的代码中,datasets
模块主要用于以下几个方面:
- 加载数据集:
train_dataset = load_dataset('text', data_files=r'../data/train.txt')
这里使用了 load_dataset
函数从本地文本文件加载数据集。load_dataset
返回一个 DatasetDict
对象,其中包含不同分割(如训练集、验证集、测试集)的数据。
- 映射数据处理函数:
new_dataset = train_dataset.map(convert_func, batched=True)
使用 map
方法将 convert_func
函数应用到数据集的每个样本上,并返回一个新的数据集对象,batched=True
参数表示将整个数据集作为一个批次进行处理,这个方法非常高效,因为它可以在底层进行优化,例如并行处理和批处理。
至于convert_func是什么,我们稍后介绍。
2.7 数据集加载与批处理
data_handle目录下的data_preprocess.py,有了前面的知识铺垫,里面的代码理解起来会容易很多。内容如下:
import torch
import numpy as np
from functools import partial
from datasets import load_dataset
from transformers import AutoTokenizer
from template import HardTemplatedef convert_example(examples: dict,tokenizer,max_seq_len: int,max_label_len: int,hard_template: HardTemplate,train_mode=True,return_tensor=False) -> dict:"""将样本数据转换为模型接收的输入数据。Args:examples (dict): 训练数据样本, e.g. -> {"text": ['手机 这个手机也太卡了。','体育 世界杯为何迟迟不见宣传',...]}max_seq_len (int): 句子的最大长度,若没有达到最大长度,则padding为最大长度max_label_len (int): 最大label长度,若没有达到最大长度,则padding为最大长度hard_template (HardTemplate): 模板类。train_mode (bool): 训练阶段 or 推理阶段。return_tensor (bool): 是否返回tensor类型,如不是,则返回numpy类型。Returns:dict (str: np.array) -> tokenized_output = {'input_ids': [[1, 47, 10, 7, 304, 3, 3, 3, 3, 47, 27, 247, 98, 105, 512, 777, 15, 12043, 2], ...],'token_type_ids': [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ...],'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], ...],'mask_positions': [[5, 6, 7, 8], ...],'mask_labels': [[2372, 3442, 0, 0], [2643, 4434, 2334, 0], ...]}"""# 定义输出格式(Bert模型的接收格式)tokenized_output = {'input_ids': [],'token_type_ids': [],'attention_mask': [],'mask_positions': [],'mask_labels': []}# 遍历样本数据,将样本填充到模板中,并转化为Bert模型的输入格式for i, example in enumerate(examples['text']):if train_mode:# 如果是训练模式,则既有样本的label,也有样本的文本内容label, content = example.strip().split('\t')else:# 如果是评估(推理)模式,则只有样本的文本内容content = example.strip()# 需要将每个样本整合成一个字典,这样硬模板对象才方便处理inputs_dict = {'textA': content,'MASK': '[MASK]'}# 将样本填充到硬模板当中,并将其转化为Bert模型的输入格式encoded_inputs = hard_template(inputs_dict=inputs_dict,tokenizer=tokenizer,max_seq_len=max_seq_len,mask_length=max_label_len)# 收集硬模板的处理结果tokenized_output['input_ids'].append(encoded_inputs["input_ids"])tokenized_output['token_type_ids'].append(encoded_inputs["token_type_ids"])tokenized_output['attention_mask'].append(encoded_inputs["attention_mask"])tokenized_output['mask_positions'].append(encoded_inputs["mask_position"])# 对于训练模式,则需要将label转化为Bert模型的输入格式if train_mode:# 将标签转化为idlabel_encoded = tokenizer(text=[label])# 因为分词器处理的结果是以[CLS]开头和[SEP]结尾的,所以需要将[CLS]和[SEP]去除label_encoded = label_encoded['input_ids'][0][1:-1] # 这里中间有个索引[0],是因为分词器输入标签时,也用了一层列表包装# 去掉[CLS]和[SEP]后,如果标签的长度大于max_label_len,则截断label_encoded = label_encoded[:max_label_len]# 如果标签的长度小于max_label_len,则填充pad_token_idlabel_encoded = label_encoded + [tokenizer.pad_token_id] * (max_label_len - len(label_encoded))# 收集处理后的标签tokenized_output['mask_labels'].append(label_encoded)# 将数据转化为torch.tensor或者numpy.array格式,方便后续处理for k, v in tokenized_output.items():if return_tensor:tokenized_output[k] = torch.LongTensor(v)else:tokenized_output[k] = np.array(v)return tokenized_outputif __name__ == '__main__':# 导入数据train_dataset = load_dataset('text', data_files=r'../data/train.txt')# load_dataset会返回一个datasets.dataset_dict.DatasetDict对象,该对象有一个名为train的键,train_dataset['train']是一个datasets.arrow_dataset.Dataset对象# train_dataset['train']['text']是一个列表,里面存储着每条样本的文本内容,原来txt文件中,每个非空行都会被当成一个样本# 创建分词器tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')# 创建硬模板对象hard_template = HardTemplate(prompt='这是一条{MASK}评论:{textA}')# 函数式编程convert_func = partial(convert_example,tokenizer=tokenizer,hard_template=hard_template,max_seq_len=30,max_label_len=2,)# 这里将 `convert_example` 函数的部分参数(如 `tokenizer`, `hard_template`, `max_seq_len`, `max_label_len`)预先绑定,# 生成了一个新的函数 `convert_func`。这样,在后续调用 `convert_func` 时,只需要传递剩余的参数(如 `examples` 和其他可选参数)。# batched=True相当于将train_dataset看成一个批次的样本直接对数据进行处理,节省时间new_dataset = train_dataset.map(convert_func, batched=True) # dataset是一个datasets.dataset_dict.DatasetDict对象for value in new_dataset['train']:# value将是一个字典,其中包含输入的id、token_type_id、attention_mask、mask_position和mask_labelprint(type(value))for k, v in value.items():print(k, v)print(len(value['input_ids']))break
输出:
<class 'dict'>
text 电脑 (1)这款笔记本外观感觉挺漂亮的,分量吗,对我来说不算沉。 (2)安装了WindowsXP系统后,运行的速度挺快。发热量没有想象中那么大。可能尚未运行很耗资源的程序,没有感到内存的弊病。不过,1G的内存确实有点小。 (3)附赠的包很不错,挺有手感的。但是附赠的鼠标实在是太小了,幸好同时订了一个双飞燕的鼠标哟。
input_ids [101, 6821, 3221, 671, 3340, 103, 103, 6397, 6389, 8038, 113, 122, 114, 6821, 3621, 5011, 6381, 3315, 1912, 6225, 2697, 6230, 2923, 4023, 778, 4638, 8024, 1146, 7030, 102]
token_type_ids [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
attention_mask [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
mask_positions [5, 6]
mask_labels [4510, 5554]
30
2.8 数据集导入器
data_handle目录下的data_loader.py只有一个函数,即get_data,它的目的是获得数据导入器,只要前面的template.py和data_preprocess.py掌握之后,这里的代码理解起来很容易。
# coding:utf-8
from torch.utils.data import DataLoader
from transformers import default_data_collator
from data_preprocess import *def get_data(prompt_file, train_path, dev_path, max_seq_len, max_label_len, batch_size, tokenizer):"""准备数据集函数。该函数从指定的文件中读取提示词模板,加载训练集和验证集,并将它们转换为模型可以接受的格式。参数:- prompt_file: 提示词模板文件的路径。- train_path: 训练集文件的路径。- dev_path: 验证集文件的路径。- max_seq_len: 输入序列的最大长度。- max_label_len: 标签序列的最大长度。- batch_size: 每个批次的大小。- tokenizer: 分词器。返回:- train_dataloader: 训练集的数据加载器。- dev_dataloader: 验证集的数据加载器。"""# 从文件prompt.txt从读取提示词模板prompt = open(prompt_file, 'r', encoding='utf8').readlines()[0].strip()# 创建硬模板对象hard_template = HardTemplate(prompt=prompt)# 导入数据dataset = load_dataset('text', data_files={'train': train_path, 'dev': dev_path})# load_dataset会返回一个datasets.dataset_dict.DatasetDict对象,字典里包含 train 键和 dev 键# 将将样本数据转换为模型接收的输入数据。# 函数式编程new_func = partial(convert_example,tokenizer=tokenizer,hard_template=hard_template,max_seq_len=max_seq_len,max_label_len=max_label_len)# 将数据集转换为模型接收的输入数据dataset = dataset.map(new_func, batched=True)# 从数据集对象中获取训练集与验证集train_dataset = dataset["train"]dev_dataset = dataset["dev"]# print('train_dataset', train_dataset[:2])# print('*'*80)# default_data_collator作用,转换为tensor数据类型# 创建DataLoader对象,用于加载数据集train_dataloader = DataLoader(train_dataset,shuffle=True,collate_fn=default_data_collator,batch_size=batch_size)dev_dataloader = DataLoader(dev_dataset,collate_fn=default_data_collator,batch_size=batch_size)return train_dataloader, dev_dataloaderif __name__ == '__main__':# 创建分词器tokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')# 调用get_data函数,获取训练集和验证集的数据加载器train_dataloader, dev_dataloader = get_data(prompt_file='../data/prompt.txt',train_path='../data/train.txt',dev_path='../data/dev.txt',max_seq_len=512,max_label_len=2,batch_size=8,tokenizer=tokenizer)print('训练集batch数量:', len(train_dataloader))print('验证集batch数量:', len(dev_dataloader))print('-'*80)for i, value in enumerate(train_dataloader):print('type(value): ', type(value))print('value.keys(): ', value.keys())print('len(value): ', len(value))print(r"value['input_ids'].shape: ", value['input_ids'].shape)print(r"value['input_ids'].dtype: ", value['input_ids'].dtype)
输出
训练集batch数量: 8
验证集batch数量: 74
--------------------------------------------------------------------------------
type(value): <class 'dict'>
value.keys(): dict_keys(['input_ids', 'token_type_ids', 'attention_mask', 'mask_positions', 'mask_labels'])
len(value): 5
value['input_ids'].shape: torch.Size([8, 512])
value['input_ids'].dtype: torch.int64
从输出中可以看到,value['input_ids'].shape
为torch.Size([8, 512])
,8表示batch_size,512表示句子的最大长度。
3 模型配置与推理
3.1 配置文件
在当前目录(PET)下有个名为pet_config.py的文件,它是模型的配置文件,即把相关的配置参数都写到了一个类中,代码如下:
# coding:utf-8
import torchclass ProjectConfig(object):def __init__(self):self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu' # windows电脑/linux服务器self.pre_model = r'../预训练模型/bert-base-chinese'self.train_path = r'data/train.txt'self.dev_path = r'data/dev.txt'self.prompt_file = r'data/prompt.txt'self.verbalizer = r'data/verbalizer.txt'self.max_seq_len = 512 # 输入文本的最大长度self.batch_size = 8self.learning_rate = 5e-5 # 学习率,在NLP中,一般1e-6 ~ 1e-5之间self.weight_decay = 0 # L2正则化系数self.warmup_ratio = 0.06 # 学习率预热参数self.max_label_len = 2 # 标签的最大长度(字符个数)self.epochs = 2self.logging_steps = 10 # 日志相关设置self.valid_steps = 20 # 每隔多少个step做一次验证self.save_dir = r'checkpoints'if __name__ == '__main__':pc = ProjectConfig()print(pc.prompt_file)print(pc.pre_model)
输出
data/prompt.txt
../预训练模型/bert-base-chinese
有了上一篇文章(全量微调项目——基于GPT2 搭建医疗问诊机器人)和刚刚介绍的内容为基础,这个脚本看懂并不难,这里不做赘述。
我们这边只是为了简单演示,epoch配置的是2,实际工作中,不可能用那么小的。
3.2 模型推理
在当前目录下,有个名为inference.py的脚本,内容如下:
import time
from typing import Listimport torch
from rich import print
from transformers import AutoTokenizer, AutoModelForMaskedLMimport sys
sys.path.append('data_handle')from utils.verbalizer import Verbalizer
from data_handle.template import HardTemplate
from data_handle.data_preprocess import convert_example
from utils.common_utils import convert_logits_to_idsdef inference(contents: List[str]):"""推理函数,输入原始句子,输出mask label的预测值。Args:contents (List[str]): 描原始句子列表。"""device = 'cuda:0' if torch.cuda.is_available() else 'cpu'model_path = 'checkpoints/model_best_old'# 创建分词器tokenizer = AutoTokenizer.from_pretrained(model_path)# 创建模型model = AutoModelForMaskedLM.from_pretrained(model_path)model.to(device).eval()# 标签最大长度max_label_len = 2# 创建标签映射器verbalizer = Verbalizer(verbalizer_file='data/verbalizer.txt',tokenizer=tokenizer,max_label_len=max_label_len)# prompt定义prompt = open('data/prompt.txt', 'r', encoding='utf8').readlines()[0].strip()# 创建硬模板hard_template = HardTemplate(prompt=prompt)with torch.no_grad():start_time = time.time()# 将样本数据转换为模型接收的输入数据。examples = {'text': contents}tokenized_output = convert_example(examples, tokenizer, hard_template=hard_template,max_seq_len=128,max_label_len=max_label_len,train_mode=False,return_tensor=True)# 模型推理logits = model(input_ids=tokenized_output['input_ids'].to(device),token_type_ids=tokenized_output['token_type_ids'].to(device),attention_mask=tokenized_output['attention_mask'].to(device)).logits# logits的形状为(batch_size, seq_len, vocab_size)# 推理结果后处理predictions = convert_logits_to_ids(logits, tokenized_output['mask_positions']).cpu().numpy().tolist() # (batch, label_num)predictions = verbalizer.batch_find_main_label(predictions) # 找到子label属于的主labelpredictions = [ele['label'] for ele in predictions]used = time.time() - start_timeprint(f'Used {used}s.')return predictionsif __name__ == '__main__':# 测试数据contents = ['天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐','环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐',"物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小","福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多来点其他小点,饼干一直也是大爱,那天好像也没看到","服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适,晚上休息很安逸,隔音效果不错赞,下次还会来"]print("针对下面的文本评论,请分别给出对应所属类别:")# 推理res = inference(contents)#print('inference label(s):', res)# 将测试数据与推理结果拼接成字典new_dict = {}for i in range(len(contents)):new_dict[contents[i]] = res[i]print(new_dict)
输出
针对下面的文本评论,请分别给出对应所属类别:
Used 0.44545769691467285s.
{'天台很好看,躺在躺椅上很悠闲,因为活动所以我觉得性价比还不错,适合一家出行
,特别是去迪士尼也蛮近的,下次有机会肯定还会再来的,值得推荐': '酒店','环境,设施,很棒,周边配套设施齐全,前台小姐姐超级漂亮!酒店很赞,早餐不错
,服务态度很好,前台美眉很漂亮。性价比超高的一家酒店。强烈推荐': '酒店','物流超快,隔天就到了,还没用,屯着出游的时候用的,听方便的,占地小':
'衣服','福行市来到无早集市,因为是喜欢的面包店,所以跑来集市看看。第一眼就看到了,
之前在微店买了小刘,这次买了老刘,还有一直喜欢的巧克力磅蛋糕。好奇老板为啥不做
柠檬磅蛋糕了,微店一直都是买不到的状态。因为不爱碱水硬欧之类的,所以期待老板多
来点其他小点,饼干一直也是大爱,那天好像也没看到': '衣服','服务很用心,房型也很舒服,小朋友很喜欢,下次去嘉定还会再选择。床铺柔软舒适
,晚上休息很安逸,隔音效果不错赞,下次还会来': '酒店'
}
代码中的模型路径,是在训练之后得到的最优模型,当然,这份代码是训练完之后讲解的,因此有这个。
在推理结果后处理的部分,使用了convert_logits_to_ids
函数,以及标签映射器的batch_find_main_label
,我们来逐个介绍。
3.3 convert_logits_to_ids函数
这个函数的代码在 PET/utils/common_utils.py
中,它将mask位置的预测结果转化为词表中的token_id
,代码如下:
import torch
def convert_logits_to_ids(logits: torch.tensor, mask_positions: torch.tensor):"""输入Language Model的词表概率分布(LMModel的logits,即模型的输出),将mask_position位置的token logits转换为token的id。Args:logits (torch.tensor): model output -> (batch, seq_len, vocab_size)mask_positions (torch.tensor): mask token的位置 -> (batch, max_label_len)Returns:torch.LongTensor: 对应mask position上最大概率的推理token -> (batch, max_label_len)"""label_length = mask_positions.size()[1] # 标签长度batch_size, seq_len, vocab_size = logits.size()# 输出的逻辑值打平,方便后续从中获取最大概率的token idlogits = logits.reshape(batch_size * seq_len, -1) # (batch_size * seq_len, vocab_size)# 将mask_positions转换为逻辑值打平后的索引mask_positions_after_reshaped = []for batch, mask_pos in enumerate(mask_positions.detach().cpu().numpy().tolist()):for pos in mask_pos:mask_positions_after_reshaped.append(batch * seq_len + pos)# 找到mask位置的预测概率分布mask_logits = logits[mask_positions_after_reshaped] # (batch_size * label_num, vocab_size)# 找到每个mask位置的最大概率所在索引(字典中的索引),即预测的token_idpredict_tokens = mask_logits.argmax(dim=-1) # (batch_size * max_label_len)# 将获得的token_id转化成batch的形式predict_tokens = predict_tokens.reshape(-1, label_length) # (batch_size, max_label_len)return predict_tokensif __name__ == '__main__':logits = torch.randn(2, 20, 21193)mask_positions = torch.LongTensor([[5, 6],[5, 6],])predict_tokens = convert_logits_to_ids(logits, mask_positions)print(predict_tokens)
输出
tensor([[20063, 20244],[ 6725, 7702]])
当然,PET/utils/common_utils.py
中还有另一个计算损失的函数,这个我们后面会讲。
3.4 标签映射类(第一部分)
推理的时候,创建了一个标签映射器,因为推理推出来的很可能是子标签,这里需要将其转化为主标签。
该类的代码在PET/utils/verbalizer.py
中,这个类的方法太多,代码太长,我这里只讲推理中用到的,剩下的需要到训练的时候再介绍
# -*- coding:utf-8 -*-
from typing import Union, List # Union 是 typing 模块中定义的一个类,用于表示多个类型中的任意一种类型class Verbalizer(object):def __init__(self, verbalizer_file: str, tokenizer, max_label_len: int):"""Args:verbalizer_file (str): verbalizer文件存放地址。tokenizer: 用于文本和id之间的转换。max_label_len (int): 标签长度,若大于则截断,若小于则补齐"""self.tokenizer = tokenizerself.label_dict = self.load_label_dict(verbalizer_file)self.max_label_len = max_label_lendef load_label_dict(self, verbalizer_file: str):"""读取本地文件,构建verbalizer字典(标签映射字典)。Args:verbalizer_file (str): verbalizer文件存放地址。Returns:dict -> {'体育': ['篮球', '足球','网球', '排球', ...],'酒店': ['宾馆', '旅馆', '旅店', '酒店', ...],...}"""label_dict = {}with open(verbalizer_file, 'r', encoding='utf8') as f:# 逐行读取for line in f.readlines():label, sub_labels = line.strip().split('\t')label_dict[label] = list(set(sub_labels.split(','))) # 这里使用集合来去重return label_dictdef find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):"""通过子标签找到父标签。Args:sub_label (List[Union[list, str]]): 子标签, 文本型 或 id_list, e.g. -> '苹果' or [5741, 3362]hard_mapping (bool): 当生成的词语不在子标签中,是否一定要匹配到一个最相似的label。Returns:dict -> {'label': '水果', 'token_ids': [3717, 3362]}"""# 如果传入为id_list, 则通过tokenizer转回来if type(sub_label) == list:# 移除[PAD]token(如果有)pad_token_id = self.tokenizer.pad_token_idwhile pad_token_id in sub_label:sub_label.remove(pad_token_id)# 将id_list转换为字符串sub_label = ''.join(self.tokenizer.convert_ids_to_tokens(sub_label))# 遍历标签映射字典,看标签是否在映射字典的子标签中main_label = '无'for label, s_labels in self.label_dict.items():if sub_label in s_labels:main_label = labelbreak# 如果标签不在映射字典的子标签中,那么main_label将是'无‘# 此时若hard_mapping为True,则通过最大公共子串匹配最相似的labelif main_label == '无' and hard_mapping:main_label = self.hard_mapping(sub_label)# 构建返回结果样式ret = {'label': main_label,'token_ids': self.tokenizer(main_label)['input_ids'][1:-1] # 通过分词器转为id_list,并去掉[CLS]和[SEP]}return retdef batch_find_main_label(self, sub_label: List[Union[list, str]], hard_mapping=True):"""批量通过子标签找父标签。"""return [self.find_main_label(l, hard_mapping) for l in sub_label]
inference.py中调用的是batch_find_main_label
函数,而batch_find_main_label
则是通过列表生成式调用find_main_label
函数,通过代码中的注释,能比较好地理解这个函数在坐什么。
对于衣物类的评论,有可能模型生成的词汇是“裤子”,而它不在“衣服”的子标签中,这个时候需要使用强匹配策略。
find_main_label
函数中,如果大模型生成的词汇(预测的子标签),不在映射字典的子标签中,则调用hard_mapping
函数,该函数是通过寻找与生成的词汇有最大公共子串的子标签(映射字典中的子标签)来确定主标签,因此调用了寻找最大公共子串的函数。(当然,如果是我的话,我会使用词嵌入的方式寻找最接近的子标签,进而推导出主标签,因为找公共子串不太靠谱)
代码如下:
def hard_mapping(self, sub_label: str):"""强匹配函数,当模型生成的子label不存在时,通过最大公共子串找到重合度最高的主label。Args:sub_label (str): 子label。Returns:str: 主label。"""label, max_overlap_str = '', 0# 遍历映射字典,寻找重合度最高的主labelfor main_label, sub_labels in self.label_dict.items():overlap_num = 0# 求所有子label与当前推理label之间的公共子串长度之和for s_label in sub_labels:overlap_num += self.get_common_sub_str(sub_label, s_label)[1]if overlap_num >= max_overlap_str:max_overlap_str = overlap_numlabel = main_labelreturn labeldef get_common_sub_str(self, str1: str, str2: str):"""寻找最大公共子串。str1:abcdstr2:abadbcdba返回公共字串和最大匹配长度"""lstr1, lstr2 = len(str1), len(str2)# 生成0矩阵,为方便后续计算,比字符串长度多了一列record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)]p = 0 # 最长匹配对应在str1中的最后一位maxNum = 0 # 最长匹配长度for i in range(lstr1):for j in range(lstr2):if str1[i] == str2[j]:record[i+1][j+1] = record[i][j] + 1if record[i+1][j+1] > maxNum:maxNum = record[i+1][j+1]p = i + 1return str1[p-maxNum:p], maxNum
再写段测试代码看看功能:
if __name__ == '__main__':from rich import printfrom transformers import AutoTokenizertokenizer = AutoTokenizer.from_pretrained('../../预训练模型/bert-base-chinese')verbalizer = Verbalizer(verbalizer_file='../data/verbalizer.txt',tokenizer=tokenizer,max_label_len=2)sub_label = ['西瓜', '裤子', '牛奶']ret = verbalizer.batch_find_main_label(sub_label, hard_mapping=True)print(ret)
输出
[{'label': '水果', 'token_ids': [3717, 3362]},{'label': '衣服', 'token_ids': [6132, 3302]},{'label': '蒙牛', 'token_ids': [5885, 4281]}
]
4 模型训练(微调)
4.1 标签映射类(第二部分)
模型在训练时,是以子标签作为目标值,但我们拿到的样本数据,对应的标签却是主标签,所以在计算损失之前,需要根据主标签获取子标签。
前面我们介绍标签映射类的时候,只讲了由子标签找主标签,其实还有一部分代码,即由主标签找子标签,代码如下:
def find_sub_labels(self, label: Union[list, str]):"""通过标签找到所有的子标签。Args:label (Union[list, str]): 主标签, 文本型 或 id_list, e.g. -> '体育' or [860, 5509]Returns:dict -> {'sub_labels': ['足球', '网球'],'token_ids': [[6639, 4413], [5381, 4413]]}"""# 如果传入为id_list, 则通过tokenizer转回来if type(label) == list:# 移除[PAD]token(如果有)while self.tokenizer.pad_token_id in label:label.remove(self.tokenizer.pad_token_id)# 将id_list转换为字符串label = ''.join(self.tokenizer.convert_ids_to_tokens(label))# 如果主标签不在标签映射字典中,则报错if label not in self.label_dict:raise ValueError(f'Lable Error: "{label}" not in label_dict {list(self.label_dict)}.')# 能执行到这里,说明没有报错,则直接从标签映射字典中获取子标签sub_labels = self.label_dict[label]# 将每个子标签转化为token id,并去掉[CLS]和[SEP]token_ids = [_id[1:-1] for _id in self.tokenizer(sub_labels)['input_ids']]# 遍历每个子标签,并进行截断与补齐for i in range(len(token_ids)):# 截断token_ids[i] = token_ids[i][:self.max_label_len]# 补齐if len(token_ids[i]) < self.max_label_len:token_ids[i] = (token_ids[i] + [self.tokenizer.pad_token_id] *(self.max_label_len - len(token_ids[i])))# 返回子标签和token idret = {'sub_labels': sub_labels, 'token_ids': token_ids}return retdef batch_find_sub_labels(self, label: List[Union[list, str]]):"""批量找到子标签。"""return [self.find_sub_labels(l) for l in label]
看懂了标签映射类(第一部分),这里的两个函数也很容易看懂。
4.2 损失函数
计算损失的函数在PET/utils/common_utils.py
中,这里需要注意的一个点是,假如某个样本的主标签为衣服,则它的主标签有三个,分别为衣服、裙子、西装,那么计算损失的时候,是计算预测值与这三个子标签的平均损失。
代码如下:
def mlm_loss(logits, mask_positions, sub_mask_labels, cross_entropy_criterion, device):"""计算指定位置的mask token的output与label之间的cross entropy loss。Args:logits (torch.tensor): 模型原始输出 -> (batch, seq_len, vocab_size)mask_positions (torch.tensor): mask token的位置 -> (batch, mask_label_num)sub_mask_labels (list): mask token的sub label, 子标签已转为token id,列表中的元素个数为 batche.g. -> [[[2398, 3352]],[[2398, 3352], [3819, 3861]]]cross_entropy_criterion (CrossEntropyLoss): CE Loss计算器device (str): cpu还是gpuReturns:torch.tensor: CE Loss"""batch_size, seq_len, vocab_size = logits.size()loss = Nonefor single_value in zip(logits, sub_mask_labels, mask_positions):# 获取单个样本的相关信息single_logits = single_value[0] # (seq_len, vocab_size)single_sub_mask_labels = single_value[1] # (mask_label_num,)single_mask_positions = single_value[2] # 列表,因为主标签对应的子标签有可能有多个# 从模型输出中获取mask位置的logitssingle_mask_logits = single_logits[single_mask_positions] # (mask_label_num, vocab_size)# 广播重复,使第一个维度与子标签的数量相同,方便后续计算损失single_mask_logits = single_mask_logits.repeat(len(single_sub_mask_labels), 1, 1) # (sub_label_num, mask_label_num, vocab_size)# 修改维度single_mask_logits = single_mask_logits.reshape(-1, vocab_size) # (sub_label_num * mask_label_num, vocab_size)# 将子标签转换为torch张量,并迁移设备single_sub_mask_labels = torch.LongTensor(single_sub_mask_labels).to(device) # (sub_label_num, mask_label_num)# 调整标签维度single_sub_mask_labels = single_sub_mask_labels.reshape(-1, 1).squeeze() # (sub_label_num * mask_label_num)# 计算损失cur_loss = cross_entropy_criterion(single_mask_logits, single_sub_mask_labels)# 除以标签数量,得到相对于每个子标签的损失cur_loss = cur_loss / len(single_sub_mask_labels)# 累加损失if not loss:loss = cur_losselse:loss += cur_loss# 计算每个样本的平均损失loss = loss / batch_size # (1,)return loss
4.3 模型的评估指标类
计算模型评估指标的代码在PET/utils/metirc_utils.py
中,这里写了一个名为ClassEvaluator的类,该类用来计算模型的各个评估指标,代码如下:
# coding='utf-8'
import numpy as np
from typing import List
from sklearn.metrics import accuracy_score, precision_score, f1_score, recall_score, confusion_matrixclass ClassEvaluator(object):def __init__(self):self.goldens = [] # 真实标签(主标签)self.predictions = [] # 预测标签(主标签)def add_batch(self, pred_batch: List[List], gold_batch: List[List]):"""添加一个batch中的prediction和gold列表,用于后续统一计算这里的 pred_batch 和 gold_batch 都是子标签Args:pred_batch (list): 模型预测标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]gold_batch (list): 真实标签标签列表, e.g. -> [['体', '育'], ['财', '经'], ...]"""assert len(pred_batch) == len(gold_batch)# 批量拼接# [['体', '育'], ['财', '经'], ...] -> ['体育', '财经', ...]if type(gold_batch[0]) in [list, tuple]:# 将所有的label拼接为一个整label: ['体', '育'] -> '体育'pred_batch = [''.join([str(e) for e in ele]) for ele in pred_batch]# ele是单个样本,例如 ['体', '育'], ''.join([str(e) for e in ele])则是批量拼接gold_batch = [''.join([str(e) for e in ele]) for ele in gold_batch]# 扩展真实值与预测值的列表self.goldens.extend(gold_batch)self.predictions.extend(pred_batch)def compute(self, round_num=2) -> dict:"""根据当前类中累积的变量值,计算当前的P, R, F1。Args:round_num (int): 计算结果保留小数点后几位, 默认小数点后2位。Returns:dict -> {'accuracy': 准确率,'precision': 精准率,'recall': 召回率,'f1': f1值,'class_metrics': {'0': {'precision': 该类别下的precision,'recall': 该类别下的recall,'f1': 该类别下的f1},...}}"""classes, class_metrics, res = sorted(list(set(self.goldens) | set(self.predictions))), {}, {}# 先将self.goldens和self.predictions两个列表转换为集合(set),然后对它们求并集,随后转列表并排序,得到classes# classes是由字符串构成的列表,其中包含所有真实值与预测值# 求总体的精度,准确率,召回率,F1res['accuracy'] = round(accuracy_score(self.goldens, self.predictions), round_num)res['precision'] = round(precision_score(self.goldens, self.predictions, average='weighted'), round_num)res['recall'] = round(recall_score(self.goldens, self.predictions, average='weighted'), round_num)res['f1'] = round(f1_score(self.goldens, self.predictions, average='weighted'), round_num)try:# 求混淆矩阵,并计算每个类别的指标conf_matrix = np.array(confusion_matrix(self.goldens, self.predictions)) # (n_class, n_class)assert conf_matrix.shape[0] == len(classes)# 构建每个类别的评价指标for i in range(conf_matrix.shape[0]):precision = 0 if sum(conf_matrix[:, i]) == 0 else conf_matrix[i, i] / sum(conf_matrix[:, i])recall = 0 if sum(conf_matrix[i, :]) == 0 else conf_matrix[i, i] / sum(conf_matrix[i, :])f1 = 0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall)class_metrics[classes[i]] = {'precision': round(precision, round_num),'recall': round(recall, round_num),'f1': round(f1, round_num)}# 将每个类别的评价指标加入到返回结果中,这里class_metrics是包含各个类别的指标的字典res['class_metrics'] = class_metricsexcept Exception as e:print(f'[Warning] Something wrong when calculate class_metrics: {e}')print(f'-> goldens: {set(self.goldens)}')print(f'-> predictions: {set(self.predictions)}')print(f'-> diff elements: {set(self.predictions) - set(self.goldens)}')res['class_metrics'] = {}return resdef reset(self):"""重置积累的数值,即清空真实值与预测值的列表"""self.goldens = []self.predictions = []if __name__ == '__main__':from rich import printmetric = ClassEvaluator()metric.add_batch([['财', '经'], ['财', '经'], ['体', '育'], ['体', '育'], ['计', '算', '机']],[['体', '育'], ['财', '经'], ['体', '育'], ['计', '算', '机'], ['计', '算', '机']],)print(metric.compute())
输出:
{'accuracy': 0.6,'precision': 0.7,'recall': 0.6,'f1': 0.6,'class_metrics': {'体育': {'precision': 0.5, 'recall': 0.5, 'f1': 0.5},'计算机': {'precision': 1.0, 'recall': 0.5, 'f1': 0.67},'财经': {'precision': 0.5, 'recall': 1.0, 'f1': 0.67}}
}
这个类的用法:创建ClassEvaluator对象后,模型每预测一个batch的样本,就把真实标签(主标签)与预测值(同样是主标签)通过add_batch
方法加入进去,当所有用来评估的样本计算完之后,再调用compute
方法计算。
4.4 模型微调
模型的微调代码在PET/train.py
中,这里有两个函数,我们先讲model2train:
import os
import time
from transformers import AutoModelForMaskedLM, AutoTokenizer, get_schedulerimport sys
sys.path.append('data_handle')from utils.metirc_utils import ClassEvaluator
from utils.common_utils import *
from data_handle.data_loader import *
from utils.verbalizer import Verbalizerfrom pet_config import *
pc = ProjectConfig()def model2train():# 创建分词器、训练模型、标签映射器tokenizer = AutoTokenizer.from_pretrained(pc.pre_model)model = AutoModelForMaskedLM.from_pretrained(pc.pre_model)verbalizer = Verbalizer(verbalizer_file=pc.verbalizer,tokenizer=tokenizer,max_label_len=pc.max_label_len)# 指定不需要权重衰减(L2正则化)的参数,通过指定参数名所包含的字符串来进行no_decay = ["bias", "LayerNorm.weight"] # 如果参数名包含"bias"或者"LayerNorm.weight",则对应参数不进行L2正则化optimizer_grouped_parameters = [{"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],"weight_decay": pc.weight_decay,},{"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],"weight_decay": 0.0,},]# [nd in n for nd in no_decay],遍历no_decay,若nd在n中则为True,否则False,最后得到一个由布尔值构成的列表# any(List),判断列表中是否有True,如果有则返回True,否则返回False# 如果[nd in n for nd in no_decay]为[False, False],则说明参数的名字中,既不包含bias和LayerNorm.weight# 若参数的名字中,既不包含bias和LayerNorm.weight,则any(...)为False,相关参数需要进行权重衰减# 创建优化器optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=pc.learning_rate)model.to(pc.device)# 创建训练集和验证集的数据导入器train_dataloader, dev_dataloader = get_data(pc.prompt_file, pc.train_path, pc.dev_path,pc.max_seq_len, pc.max_label_len,pc.batch_size, tokenizer)# 根据训练轮数计算最大训练步数,以便于scheduler动态调整lrnum_update_steps_per_epoch = len(train_dataloader)# 计算总的训练步数与预热阶段的训练步数,确保学习率在整个训练过程中得以合理地调节max_train_steps = pc.epochs * num_update_steps_per_epochwarm_steps = int(pc.warmup_ratio * max_train_steps)# 创建学习率调度器lr_scheduler = get_scheduler(name='linear',optimizer=optimizer,num_warmup_steps=warm_steps,num_training_steps=max_train_steps,)# 创建评估指标计算器与损失函数计算器metric = ClassEvaluator()criterion = torch.nn.CrossEntropyLoss()# 训练loss_list = []global_step, best_f1 = 0, 0print('开始训练:')tic_train = time.time()for epoch in range(pc.epochs):for batch in train_dataloader:# 模型预测logits = model(input_ids=batch['input_ids'].to(pc.device),token_type_ids=batch['token_type_ids'].to(pc.device),attention_mask=batch['attention_mask'].to(pc.device)).logits# 真实标签(主标签)mask_labels = batch['mask_labels'].numpy().tolist()# 根据主标签找子标签sub_labels = verbalizer.batch_find_sub_labels(mask_labels)# 获取子标签的idsub_labels = [ele['token_ids'] for ele in sub_labels]# 计算损失loss = mlm_loss(logits, batch['mask_positions'].to(pc.device), sub_labels,criterion, pc.device)# 梯度清零、反向传播、更新模型参数、更新学习率optimizer.zero_grad()loss.backward()optimizer.step()lr_scheduler.step()# 记录损失loss_list.append(float(loss.cpu().detach()))# 全局步数+1global_step += 1# 打印信息if global_step % pc.logging_steps == 0:time_diff = time.time() - tic_trainloss_avg = sum(loss_list) / len(loss_list)print("global step %d, epoch: %d, loss: %.5f, speed: %.2f step/s"% (global_step, epoch, loss_avg, pc.logging_steps / time_diff))tic_train = time.time()# 保存与评估模型if global_step % pc.valid_steps == 0:cur_save_dir = os.path.join(pc.save_dir, "model_%d" % global_step)if not os.path.exists(cur_save_dir):os.makedirs(cur_save_dir)# 保存模型model.save_pretrained(os.path.join(cur_save_dir))tokenizer.save_pretrained(os.path.join(cur_save_dir))# 评估模型acc, precision, recall, f1, class_metrics = evaluate_model(model,metric,dev_dataloader,tokenizer,verbalizer)print("Evaluation precision: %.5f, recall: %.5f, F1: %.5f" % (precision, recall, f1))# 保存最好的模型if f1 > best_f1:print(f"best F1 performence has been updated: {best_f1:.5f} --> {f1:.5f}")print(f'Each Class Metrics are: {class_metrics}')best_f1 = f1cur_save_dir = os.path.join(pc.save_dir, "model_best")if not os.path.exists(cur_save_dir):os.makedirs(cur_save_dir)model.save_pretrained(os.path.join(cur_save_dir))tokenizer.save_pretrained(os.path.join(cur_save_dir))tic_train = time.time()print('训练结束')
在评估模型时,调用了evaluate_model
函数,该函数正是PET/train.py
脚本中的另一个函数:
def evaluate_model(model, metric, data_loader, tokenizer, verbalizer):"""在测试集上评估当前模型的训练效果。Args:model: 当前模型metric: 评估指标类(metric)data_loader: 测试集的dataloaderglobal_step: 当前训练步数"""model.eval()metric.reset()with torch.no_grad():for step, batch in enumerate(data_loader):# 模型预测logits = model(input_ids=batch['input_ids'].to(pc.device),token_type_ids=batch['token_type_ids'].to(pc.device),attention_mask=batch['attention_mask'].to(pc.device)).logits# 真实标签(主标签)mask_labels = batch['mask_labels'].numpy().tolist() # (batch, label_num)# 去掉label中的[PAD]for i in range(len(mask_labels)):while tokenizer.pad_token_id in mask_labels[i]:mask_labels[i].remove(tokenizer.pad_token_id)# token id转文字mask_labels = [''.join(tokenizer.convert_ids_to_tokens(t)) for t in mask_labels]# 将mask_position位置的token logits转换为token的idpredictions = convert_logits_to_ids(logits, batch['mask_positions']) # (batch, label_num)predictions = predictions.cpu().numpy().tolist()# 找到子label属于的主labelpredictions = verbalizer.batch_find_main_label(predictions)predictions = [ele['label'] for ele in predictions] # 获取字符串# 将预测值与真实值加入到评估指标计算器中(暂不计算)metric.add_batch(pred_batch=predictions, gold_batch=mask_labels)# 计算评估指标eval_metric = metric.compute()# 模型切换回训练模式model.train()# 返回评估指标return eval_metric['accuracy'], eval_metric['precision'], \eval_metric['recall'], eval_metric['f1'], \eval_metric['class_metrics']
我们再在这个脚本的最下面写一段测试代码:
if __name__ == '__main__':model2train()
输出:
Some weights of the model checkpoint at ../预训练模型/bert-base-chinese were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Map: 100%|██████████| 63/63 [00:00<00:00, 941.55 examples/s]
Map: 100%|██████████| 590/590 [00:00<00:00, 1885.03 examples/s]
开始训练:
global step 10, epoch: 1, loss: 0.77675, speed: 0.07 step/s
训练结束
由于epoch配置的是2,所以训练两轮就结束了,实际训练时,需要把epoch改大。
好的,至此,该项目的代码讲解完毕。