目录
一、简介
二、模型训练
三、模型推理
一、简介
BERT(Bidirectional Encoder Representations from Transformers)是基于深度学习在自然语言处理(NLP)领域近几年出现的、影响深远的创新模型之一。在BERT之前,已经有许多预训练语言模型,如ELMO和GPT,它们展示了预训练模型在NLP任务中的强大性能。然而,这些模型通常基于单向的上下文信息,即只考虑文本中的前向或后向信息,这限制了它们对文本的全局理解。BERT旨在通过引入双向上下文信息来解决这一问题,从而更准确地表示文本中的语义信息。
与传统的单向语言模型相比,BERT 的核心优势在于:
- 双向性:BERT通过使用Transformer的编码器结构,能够同时从文本的左右两个方向学习上下文信息,使模型能够更好地理解句子中的每个词的语义。
- 预训练与微调:通过预训练任务,BERT 可以在多种下游任务上进行快速微调。
其中,Bert-base-chinese模型是一个在简体和繁体中文文本上训练得到的预训练模型。
二、模型训练
数据示例如下,现在有一个data.csv文件,包含两列分别是特征(feature)和标签(label)。其中,标签可能是多个分类。
第一步:读取数据并提取出特征和标签
data = pd.read_csv('./data/data.csv', encoding='utf-8') # 如果表格数据是gbk格式,则修改encoding='gbk'
X = data['feature'] # 特征列
y = data['label'].values # 标签列
第二步:对标签数据进行编码转换
label_encoder = LabelEncoder() # 初始化
y_encoded = label_encoder.fit_transform(y)
joblib.dump(label_encoder, './data/encoder.joblib') # 保存 label_encoder 以便以后使用
print(f'分类数:{len(label_encoder.classes_)} \n') # 标签的类别数量
第三步:划分训练数据集和测试数据集
X_train, X_val, y_train, y_val = train_test_split(X, y_encoded, test_size=0.1,random_state=42) # test_size=0.1表示训练集和测试集划分比例是9:1
# random_state=42表示固定随机种子为42,保证每一次分割数据集都是一样的结果
第四步:加载BERT分词器
local_model_path = './bert-base-chinese' # 模型地址
tokenizer = BertTokenizer.from_pretrained(local_model_path)
tokenizer.save_pretrained(best_model_path)
第五步:将文本数据转换成BERT模型能够理解的格式
def preprocess_for_bert(data, labels):input_ids = []attention_masks = []for sent in data: # 对每个句子(sent)进行编码处理encoded_sent = tokenizer.encode_plus(text=sent, # 要处理的句子add_special_tokens=True, # 添加特殊标记,如句子的起始标记和结束标记max_length=256, # 句子的最大长度为256个标记,超出部分将被截断,不足部分将被填充padding='max_length', # 将句子填充到固定长度(256),不足部分会用0补齐return_attention_mask=True, # 返回注意力掩码,用于标记哪些位置是填充部分,哪些位置是实际的句子内容truncation=True # 如果句子超过了最大长度,进行截断)input_ids.append(encoded_sent.get('input_ids'))attention_masks.append(encoded_sent.get('attention_mask'))# 转换为PyTorch张量(tensor),以便后续可以输入到模型中进行训练或推理input_ids = torch.tensor(input_ids)attention_masks = torch.tensor(attention_masks)labels = torch.tensor(labels)return input_ids, attention_masks, labelstrain_inputs, train_masks, train_labels = preprocess_for_bert(X_train, y_train)
val_inputs, val_masks, val_labels = preprocess_for_bert(X_val, y_val)
第六步:创建训练集DataLoader和测试集DataLoader
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=8)validation_data = TensorDataset(val_inputs, val_masks, val_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=8)
第七步:加载BERT模型
model = BertForSequenceClassification.from_pretrained(local_model_path, num_labels=len(label_encoder.classes_),ignore_mismatched_sizes=True)
model.cuda() # 默认使用第一张显卡,如果没有显卡,则可以注释改行代码
第八步:设置优化器和调度器
EPOCHS = 5 # 训练次数,可以自定义修改
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8) # 优化器
total_steps = len(train_dataloader) * EPOCHS # 训练步数
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps) # 调度器
第九步:设置精确度的计算方式
def flat_accuracy(preds, labels):pred_flat = np.argmax(preds, axis=1).flatten()labels_flat = labels.flatten()return np.sum(pred_flat == labels_flat) / len(labels_flat) # 通过比较预测类别和实际标签的相同之处,计算出预测正确的比例
第十步:训练和评估
best_model_path = './model' # 最优模型训练结果的保存路径
best_val_accuracy = 0 # 初始化最优精确度
for epoch in range(EPOCHS):model.train() # 第一步:将模型设置为训练模式total_train_loss = 0 # 初始化训练总损失为0for step, batch in enumerate(train_dataloader): # 第二步:加载训练集DataLoaderb_input_ids = batch[0].cuda() # 如果没有显卡,则可以将.cuda给删除了b_input_mask = batch[1].cuda() # 如果没有显卡,则可以将.cuda给删除了b_labels = batch[2].cuda().long() # 如果没有显卡,则可以将.cuda给删除了model.zero_grad() # 清除模型的梯度outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels) # 第三步:将输入数据传递给模型,得到模型的输出loss = outputs.loss # 第四步:提取出损失值,用于后续的反向传播total_train_loss += loss.item()loss.backward() # 第五步:进行反向传播,计算梯度torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)optimizer.step()scheduler.step()avg_train_loss = total_train_loss / len(train_dataloader) # 第六步:更新学习率torch.cuda.empty_cache() # 训练一轮就清空一次显卡缓存,如果没有显卡,则注释# 第七步:模型测试,计算准确度,处理逻辑和训练差不多model.eval()total_eval_accuracy = 0total_eval_loss = 0for batch in validation_dataloader: # 加载测试集DataLoaderb_input_ids = batch[0].cuda()b_input_mask = batch[1].cuda()b_labels = batch[2].cuda().long()with torch.no_grad():outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)loss = outputs.losstotal_eval_loss += loss.item()logits = outputs.logitslogits = logits.detach().cpu().numpy()label_ids = b_labels.to('cpu').numpy()total_eval_accuracy += flat_accuracy(logits, label_ids)avg_val_accuracy = total_eval_accuracy / len(validation_dataloader)avg_val_loss = total_eval_loss / len(validation_dataloader)torch.cuda.empty_cache() # 验证一轮就清空一次显卡缓存,如果没有显卡,则注释print(f'Training loss: {avg_train_loss}')print(f'Validation loss: {avg_val_loss}')print(f'Validation Accuracy: {avg_val_accuracy}') # 主要看这个精度,一般准确率90%以上就可以投入实际生产环境中# 在验证集上计算准确率if avg_val_accuracy > best_val_accuracy:best_val_accuracy = avg_val_accuracy# 保存模型model.save_pretrained(best_model_path) # 根据训练次数,保存最优的一个模型结果
完整代码如下:
import pandas as pd
import numpy as np
import joblib
import torch
import timefrom transformers import BertTokenizer, BertForSequenceClassification
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset, random_split
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup# 读取数据
data = pd.read_csv('./data/data.csv', encoding='utf-8') # 如果表格数据是gbk,则修改encoding='gbk'# 最优模型训练结果的保存路径
best_model_path = './model'X = data['feature'] # 特征列
y = data['label'].values # 标签列# 对标签数据进行编码转换
print("1、开始编码转换啦~")
label_encoder = LabelEncoder() # 初始化
#label_encoder = joblib.load('./data/encoder.joblib') # 当你使用同样的data第二次运行脚本时,就可以直接加载上一次保存的编码结果,而不需要重复编码(除非两次加载的数据有变动)
y_encoded = label_encoder.fit_transform(y)
print(f'分类数:{len(label_encoder.classes_)} \n') # 标签的类别数量# 保存 label_encoder 以便以后使用
joblib.dump(label_encoder, './data/encoder.joblib')# 分割数据集
X_train, X_val, y_train, y_val = train_test_split(X, y_encoded, test_size=0.1,random_state=42) # 这里训练和测试数据集比例是9:1,test_size=0.2或者0.3 固定随机种子42,保证每一次分割数据集都是一样的# 加载BERT分词器
local_model_path = './bert-base-chinese'
tokenizer = BertTokenizer.from_pretrained(local_model_path)
tokenizer.save_pretrained(best_model_path)# BERT预处理 -- 将文本数据转换成BERT模型能够理解的格式
def preprocess_for_bert(data, labels):input_ids = []attention_masks = []for sent in data: # 对每个句子(sent)进行编码处理encoded_sent = tokenizer.encode_plus(text=sent, # 要处理的句子add_special_tokens=True, # 添加特殊标记,如句子的起始标记和结束标记max_length=256, # 句子的最大长度为256个标记,超出部分将被截断,不足部分将被填充padding='max_length', # 将句子填充到固定长度(256),不足部分会用0补齐return_attention_mask=True, # 返回注意力掩码,用于标记哪些位置是填充部分,哪些位置是实际的句子内容truncation=True # 如果句子超过了最大长度,进行截断)input_ids.append(encoded_sent.get('input_ids'))attention_masks.append(encoded_sent.get('attention_mask'))# 转换为PyTorch张量(tensor),以便后续可以输入到模型中进行训练或推理input_ids = torch.tensor(input_ids)attention_masks = torch.tensor(attention_masks)labels = torch.tensor(labels)return input_ids, attention_masks, labels# 预处理数据
print("2、开始预处理数据啦~")
train_inputs, train_masks, train_labels = preprocess_for_bert(X_train, y_train)
val_inputs, val_masks, val_labels = preprocess_for_bert(X_val, y_val)# 创建DataLoader
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=8)validation_data = TensorDataset(val_inputs, val_masks, val_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=8)# 加载BERT模型
print("3、开始预加载模型啦~")
model = BertForSequenceClassification.from_pretrained(local_model_path, num_labels=len(label_encoder.classes_),ignore_mismatched_sizes=True)
model.cuda() # 默认使用第一张显卡# 设置优化器和调度器
EPOCHS = 5 # 训练次数,可以先训练5次看看效果,可以自定义修改
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8) # 优化器
total_steps = len(train_dataloader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)# 计算精确度 -- 通过比较预测类别和实际标签的相同之处,计算出预测正确的比例
def flat_accuracy(preds, labels):pred_flat = np.argmax(preds, axis=1).flatten()labels_flat = labels.flatten()return np.sum(pred_flat == labels_flat) / len(labels_flat)# 训练和评估
print("4、开始训练啦~")
best_val_accuracy = 0
for epoch in range(EPOCHS):print(f'Epoch {epoch + 1}')now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) # 记录每一轮的训练开始时间和结束时间print("start time:", now_time)model.train() # 模型设置为训练模式total_train_loss = 0 # 初始化训练总损失为0for step, batch in enumerate(train_dataloader):b_input_ids = batch[0].cuda()b_input_mask = batch[1].cuda()b_labels = batch[2].cuda().long()model.zero_grad() # 清除模型的梯度outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels) # 将输入数据传递给模型,得到模型的输出loss = outputs.loss # 提取出损失值,用于后续的反向传播total_train_loss += loss.item()loss.backward() # 进行反向传播,计算梯度torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)optimizer.step()scheduler.step()avg_train_loss = total_train_loss / len(train_dataloader) # 更新学习率torch.cuda.empty_cache() # 训练一轮就清空一次显卡缓存# 模型测试,计算准确度model.eval()total_eval_accuracy = 0total_eval_loss = 0for batch in validation_dataloader:b_input_ids = batch[0].cuda()b_input_mask = batch[1].cuda()b_labels = batch[2].cuda().long()with torch.no_grad():outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)loss = outputs.losstotal_eval_loss += loss.item()logits = outputs.logitslogits = logits.detach().cpu().numpy()label_ids = b_labels.to('cpu').numpy()total_eval_accuracy += flat_accuracy(logits, label_ids)avg_val_accuracy = total_eval_accuracy / len(validation_dataloader)avg_val_loss = total_eval_loss / len(validation_dataloader)torch.cuda.empty_cache() # 验证一轮就清空一次显卡缓存print(f'Training loss: {avg_train_loss}')print(f'Validation loss: {avg_val_loss}')print(f'Validation Accuracy: {avg_val_accuracy}') # 主要看这个精度,一般准确率90%以上就可以投入实际生产环境中# 在验证集上计算准确率if avg_val_accuracy > best_val_accuracy:best_val_accuracy = avg_val_accuracy# 保存模型model.save_pretrained(best_model_path) # 根据训练次数,保存最优的一个模型结果now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())print("end time:",now_time)print("-------------------")
三、模型推理
模型训练完成后,现在有一批新数据,你想要使用训练好的模型预测该文本数据的分类结果,则可以使用下面的推理代码,详解看注释。
import pandas as pd
import time
import torch
import joblib
from transformers import BertTokenizer, BertForSequenceClassification
import torch.nn.functional as F# 第一步:加载数据
file_path = './data/detect.csv' # 要推理的数据路径
df = pd.read_csv(file_path, encoding='utf-8')# 第二步:加载训练好的模型
best_model_path = './model'
model = BertForSequenceClassification.from_pretrained(best_model_path)
tokenizer = BertTokenizer.from_pretrained(best_model_path)# 第三步:加载编码(训练时保存的结果)
label_encoder = joblib.load('./data/encoder.joblib')predictions = [] # 预测值
confidence_scores = [] # 可信度,一般可信度大于0.9说明效果比较准确# 第四步:遍历推理数据
for row in df.iterrows():content = row[1]['feature'] # 特征列(推理样本)inputs = tokenizer(content, return_tensors="pt", padding=True, truncation=True, max_length=256)outputs = model(**inputs)probs = F.softmax(outputs.logits, dim=1)pred = torch.argmax(probs, dim=1)confidence = torch.max(probs, dim=1) # 获取置信度的值predictions.append(pred.item())confidence_scores.append(confidence.values.item())# 第五步:将预测结果解码为类别标签
decoded_categories = label_encoder.inverse_transform(predictions)# 第六步:创建一个空的DataFrame来存储推理结果
df['pred'] = decoded_categories
df['confidence_score'] = confidence_scores# 将结果保存到本地
output_file_path = './data/detect_pred.csv' # 保存推理结果的路径
df.to_csv(output_file_path, index=False)
参考文章链接:https://blog.csdn.net/yihong23/article/details/138543746