1. 从JSON格式的数据中加载并预处理样本供Ner任务训练和推理使用
class JsonNerDataset(Dataset):"""定义一个加载json格式原始命名实体识别格式数据的Dataset一行一条样本(json字符串),包含: originalText、entities"""def __init__(self,data_path, tokenizer, categories, target_padding_idx=-100,add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'):super(JsonNerDataset, self).__init__()self.tokenizer: BertTokenizer = tokenizer# self.sentence_max_len = self.tokenizer.max_len_single_sentence # 510self.sentence_max_len = 126self.categories = categoriesself.token_padding_idx = self.tokenizer.convert_tokens_to_ids('[PAD]')self.target_padding_idx = target_padding_idxself.add_special_token = add_special_tokenself.first_token_id = self.tokenizer.convert_tokens_to_ids(first_special_token)self.last_token_id = self.tokenizer.convert_tokens_to_ids(last_special_token)self.records = self.load_records(data_path)def load_records(self, data_path) -> list:records = []with open(data_path, "r", encoding="utf-8") as reader:for line in reader:# 1. 获取原始数据record = json.loads(line.strip())# 2. 原始的文本数据转换entities = record['entities']text = special_token_processor(record['originalText'])chars = list(text) # 分字,就是每个字对应一个token# 3. 标签数据转换labels = ['O'] * len(chars)for entity in entities:label_type = entity['label_type']start_pos = entity['start_pos'] # 包含开始end_pos = entity['end_pos'] # 不包含结尾if end_pos - start_pos == 1:# 只有一个字形成实体labels[start_pos] = f'S-{label_type}'elif end_pos - start_pos > 1:# 多个字形成实体labels[start_pos] = f'B-{label_type}'labels[end_pos - 1] = f'E-{label_type}'for i in range(start_pos + 1, end_pos - 1):labels[i] = f'M-{label_type}'else:raise ValueError(f"数据异常:{record}")if self.add_special_token:# 需要对chars、labels进行split分割, 单个样本的长度不能超过510for sub_chars, sub_labels in split_chars(chars, labels, self.sentence_max_len):x = self.tokenizer.convert_tokens_to_ids(sub_chars) # 针对每个字获取对应的idassert len(sub_chars) == len(x), "bert进行token id转换后,更改了列表长度."y = [self.categories[c] for c in sub_labels]x.insert(0, self.first_token_id)x.append(self.last_token_id)y.insert(0, self.categories['O'])y.append(self.categories['O'])assert len(x) == len(y), f"输入token的长度必须和标签的长度一致,当前长度为:{len(x)} - {len(y)} - {record}"records.append((x, y, len(x)))else:x = self.tokenizer.convert_tokens_to_ids(chars) # 针对每个字获取对应的idassert len(chars) == len(x), "bert进行token id转换后,更改了列表长度."y = [self.categories[c] for c in labels]assert len(x) == len(y), f"输入token的长度必须和标签的长度一致,当前长度为:{len(x)} - {len(y)} - {record}"records.append((x, y, len(x)))return recordsdef __getitem__(self, index):"""获取index对应的样本信息,包括x和y:param index: 样本的索引id:return: x,y"""x, y, num_tokens = self.records[index]return copy.deepcopy(x), copy.deepcopy(y), num_tokensdef __len__(self):return len(self.records)def collate(self, batch):max_length = max([t[2] for t in batch])x, y, mask = [], [], []for i in range(len(batch)):_x, _y, _len_current_record = copy.deepcopy(batch[i])_mask = [1] * _len_current_recordif _len_current_record < max_length:_x.extend([self.token_padding_idx] * (max_length - _len_current_record))_y.extend([self.target_padding_idx] * (max_length - _len_current_record))_mask.extend([0] * (max_length - _len_current_record))x.append(_x)y.append(_y)mask.append(_mask)token_ids = torch.tensor(x, dtype=torch.long)# 1表示实际token,0表示填充位置token_masks = torch.tensor(mask, dtype=torch.long)target_ids = torch.tensor(y, dtype=torch.long)return token_ids, token_masks, target_ids
2. Ner(命名实体识别)任务创建数据加载器createNERDataLoader
def create_dataloader(data_path, tokenizer, label_categories, batch_size, shuffle,num_workers=0, prefetch_factor=2, target_padding_idx=-100,add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):# 创建Dataset对象dataset = JsonNerDataset(data_path=data_path, tokenizer=tokenizer,categories=label_categories,target_padding_idx=target_padding_idx,add_special_token=add_special_token,first_special_token=first_special_token,last_special_token=last_special_token)print(f"当前dataset的总样本数目为:{data_path} - {len(dataset)}")# dataloader实际上是一个批次的处理器,因为dataset可以返回一条一条的样本,dataloader就负责将多条样本组合成一个批次对象返回prefetch_factor = prefetch_factor if num_workers <= 0 else num_workers * batch_sizedataloader = DataLoader(dataset=dataset, # 给定单条样本加载的对象batch_size=batch_size, # 给定批次大小shuffle=shuffle, # 获取批次数据的时候是否打乱顺序num_workers=num_workers, # 加载数据的时候是否用多进程,大于0表示使用num_workers个进程collate_fn=dataset.collate, # 给定批次数据合并的方式prefetch_factor=prefetch_factor # 多进程加载的时候,每个进程的预加载的样本数目,一般为num_workers * batch_size)return dataloader
3. trainNERDataLoader
和evalNERDataLoader
def create_train_dataloader(data_path, tokenizer, label_categories, batch_size, target_padding_idx=-100,add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):return create_dataloader(data_path, tokenizer, label_categories, batch_size,shuffle=True, num_workers=0, prefetch_factor=2,target_padding_idx=target_padding_idx,add_special_token=add_special_token,first_special_token=first_special_token,last_special_token=last_special_token)def create_eval_dataloader(data_path, tokenizer, label_categories, batch_size, target_padding_idx=-100,add_special_token=False, first_special_token='[CLS]', last_special_token='[SEP]'
):return create_dataloader(data_path, tokenizer, label_categories, batch_size,shuffle=False, num_workers=0, prefetch_factor=2,target_padding_idx=target_padding_idx,add_special_token=add_special_token,first_special_token=first_special_token,last_special_token=last_special_token)
4. text_special_char_replace
def special_token_processor(text):for old, new in [('”', '"'), ("“", '"'), ("’", "'"), ("‘", "'"), ("`", "'"), ('—', '-')]:text = text.replace(old, new)return text
5. 提取所有标签(不重名)
def extract_labels_per_file(in_file):"""从单个 JSON 格式的数据文件中提取所有唯一的标签(label_type)。:param in_file: 输入的 JSON 数据文件路径。:return: 一个包含所有唯一标签的集合。"""# 初始化一个空集合用于存储标签,集合自动去重labels = set()# 打开输入文件进行读取,指定编码为 UTF-8with open(in_file, "r", encoding="utf-8") as reader:# 逐行读取文件内容for line in reader:# 将每行的 JSON 字符串解析为 Python 字典record = json.loads(line.strip())# 获取当前记录中的实体列表entities = record['entities']# 遍历每个实体for entity in entities:# 提取实体的标签类型(label_type),并将其添加到标签集合中labels.add(entity['label_type'])# 返回包含所有唯一标签的集合return labels
6. 生成标签映射字典
def extract_labels(data_paths, categories_out_file):"""提取数据集中所有可能的标签,并生成标签到索引的映射文件。:param data_paths: 数据文件路径(可以是单个路径或路径列表)。:param categories_out_file: 生成的标签映射文件路径。"""# 如果输入的 data_paths 是字符串,则将其转换为列表if isinstance(data_paths, str):data_paths = [data_paths]# 初始化一个空集合用于存储所有标签labels = set()# 遍历每个数据文件路径for data_path in data_paths:# 提取当前文件中的标签tmp_labels = extract_labels_per_file(data_path)# 将当前文件的标签与已有的标签集合合并labels = labels.union(tmp_labels)# 将标签集合转换为排序后的列表labels = sorted(list(labels))# 打印所有提取到的标签print(f"所有的标签:{labels}")# 初始化标签到索引的映射字典,'O' 表示非实体标签,索引为 0categories = {'O': 0}# 遍历每个标签,生成 B-M-E-S 格式的标签for label in labels:for prefix in ['B', 'M', 'E', 'S']:# 为每个标签生成对应的 B-M-E-S 格式,并分配索引categories[f'{prefix}-{label}'] = len(categories)# 打印生成的标签映射字典print(categories)# 将标签映射字典写入指定的输出文件with open(categories_out_file, 'w', encoding='utf-8') as writer:# 使用 json.dump 将字典保存为 JSON 格式,设置缩进为 2,确保中文字符正常显示json.dump(categories, writer, indent=2, ensure_ascii=False)
7. 加载标签类别映射
def load_label_categories(file_path):"""从指定的 JSON 文件中加载标签类别映射(标签到索引的字典)。:param file_path: 包含标签类别映射的 JSON 文件路径。:return: 一个字典,包含标签到索引的映射。"""# 打开指定路径的 JSON 文件进行读取,指定编码为 UTF-8with open(file_path, "r", encoding="utf-8") as reader:# 使用 json.load 从文件中加载 JSON 数据,将其解析为 Python 字典categories = json.load(reader)# 返回解析后的标签类别映射字典return categories
8. 基于 Transformer 编码器的分类模型
# -*- coding: utf-8 -*-
import copyimport torch
import torch.nn as nn
from transformers import BertModelclass TransformerEncoderSoftmaxNerModel(nn.Module):def __init__(self, vocab_size, hidden_size, encoder_num_head, encoder_num_layers, num_class):super(TransformerEncoderSoftmaxNerModel, self).__init__()# 确保 hidden_size 能被 encoder_num_head 整除assert hidden_size % encoder_num_head == 0, "参数中的hidden_size必须能够整除encoder_num_head"# 词嵌入层self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=hidden_size)# Transformer 编码器层encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size,nhead=encoder_num_head,dim_feedforward=hidden_size * 4,batch_first=True)self.encoder = nn.TransformerEncoder(encoder_layer=encoder_layer, num_layers=encoder_num_layers)# 分类层(多层感知机)self.classify = nn.Sequential(nn.Dropout(p=0.1),nn.Linear(hidden_size, hidden_size * 4),nn.ReLU(),nn.Dropout(p=0.1),nn.Linear(hidden_size * 4, hidden_size * 4),nn.ReLU(),nn.Linear(hidden_size * 4, num_class),)self.num_class = num_classself.hidden_size = hidden_sizedef forward(self, token_ids, token_masks=None):"""前向过程:param token_ids: [N,T] N个样本,每个样本有T个token:param token_masks: [N,T] N个样本,每个样本的每个token是否是真实的实际token,True表示填充,False表示实际值:return: [N,T,num_class] N个样本,每个样本有T个token,每个token属于num_class个类别的置信度"""if token_masks is not None:mask_dtype = token_masks.dtypeif mask_dtype in [torch.int, torch.int64, torch.int32, torch.int8, torch.int16]:token_masks = (1 - token_masks).to(dtype=torch.bool)# 1. 获取每个token对应的静态词向量 [N,T] -> [N,T,E]token_x = self.emb_layer(token_ids)# 2. token交叉获取每个token对应的动态词向量 [N,T,E] -> [N,T,E]token_x = self.encoder(token_x, src_key_padding_mask=token_masks)# 3. 针对每个token进行全连接,判断属于各个类别的置信度# 最终的矩阵乘法操作为: [N,T,?] * [?,num_class] -> [N, T, num_class]score = self.classify(token_x)return score
9.评估指标-准确率函数
def calc_token_accuracy_number(score, target_labels, target_masks):# 1. 获取模型预测标签id [N,T,num_class] -> [N,T]pred_labels = torch.argmax(score, dim=-1)pred_labels = pred_labels.to(dtype=target_labels.dtype, device=target_labels.device)# print(f"预测为0的标签占比:{torch.mean((pred_labels == 0).to(dtype=torch.float))}")# 2. 比较实际标签和预测标签,查看是否相等is_equal = (pred_labels == target_labels).to(dtype=torch.float)# 3. 计算均值的分子和分母numerator = torch.sum(is_equal).cpu().item()denominator = torch.sum(target_masks.to(dtype=is_equal.dtype)).cpu().item()return numerator, denominatordef token_accuracy(score, target_labels, target_masks):"""计算token的准确率:param score: [N,T,num_class] 模型输出的置信度:param target_labels: [N,T] 实际样本的标签id:param target_masks: [N,T] 填充mask,实际值的地方为1,填充值的地方为0:return: float 准确率"""numerator, denominator = calc_token_accuracy_number(score, target_labels, target_masks)if denominator <= 0.0:return 0.0return numerator / denominator
9. 训练函数
# -*- coding: utf-8 -*-
import osimport torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriterfrom medical_ner.datas.bmeso_ner_labels import load_label_categories
from medical_ner.datas.json_ner_dataloader import create_train_dataloader, create_eval_dataloader
from medical_ner.losses.loss import CustomCrossEntropyLoss
from medical_ner.metrics.accuracy import token_accuracy, token_accuracy_v2, \calc_token_accuracy_number_v2, div
from medical_ner.models.ner_bert_softmax import TransformerEncoderSoftmaxNerModel
from transformers import BertTokenizerdef train(train_data_path, eval_data_path, bert_tokenizer_path, label_path,model_save_path, summary_log_dir,batch_size=8, total_epoch=10,hidden_size=128, encoder_num_head=4, encoder_num_layers=2,target_padding_idx=-100,lr=0.01
):"""训练一个基于 Transformer 编码器的命名实体识别(NER)模型。:param train_data_path: 训练数据文件路径。:param eval_data_path: 评估数据文件路径。:param bert_tokenizer_path: BERT 分词器的预训练模型路径。:param label_path: 标签类别映射文件路径。:param model_save_path: 模型保存路径。:param summary_log_dir: TensorBoard 日志保存路径。:param batch_size: 每个批次的样本数量,默认为 8。:param total_epoch: 总训练轮数,默认为 10。:param hidden_size: 隐藏层维度,默认为 128。:param encoder_num_head: Transformer 编码器的头数,默认为 4。:param encoder_num_layers: Transformer 编码器的层数,默认为 2。:param target_padding_idx: 目标标签的填充索引,默认为 -100。:param lr: 学习率,默认为 0.01。"""# 创建模型保存路径和日志路径的目录,如果不存在则创建os.makedirs(model_save_path, exist_ok=True)os.makedirs(summary_log_dir, exist_ok=True)# 依赖信息的加载'''使用 BertTokenizer 加载预训练的 BERT 分词器。加载标签类别映射(从文件中读取)。定义保存最优模型和最后一个模型的路径。'''tokenizer = BertTokenizer.from_pretrained(bert_tokenizer_path)label_categories = load_label_categories(label_path)best_dump_path = os.path.join(model_save_path, "best.pkl")last_dump_path = os.path.join(model_save_path, "last.pkl")# 1. 数据加载及解析train_dataloader = create_train_dataloader(train_data_path, tokenizer, label_categories, batch_size,target_padding_idx=target_padding_idx)eval_dataloader = create_eval_dataloader(eval_data_path, tokenizer, label_categories, batch_size * 2,target_padding_idx=target_padding_idx)# 2. 网络构造、损失函数构造、优化器构造#初始化 NER 模型(基于 Transformer 编码器)net = TransformerEncoderSoftmaxNerModel(vocab_size=tokenizer.vocab_size,hidden_size=hidden_size,encoder_num_head=encoder_num_head,encoder_num_layers=encoder_num_layers,num_class=len(label_categories))# 需要安装一个tensorboard库,一般情况下安装tensorflow的时候,会自动安装这个库# pip install tensorboard==2.12.3# 查看可视化页面,在命令行,执行以下命令:# tensorboard --logdir log_dir# tensorboard --logdir D:\workspaces\study\NLP04\projects\NamedEntityRecognition\medical_ner\test\output\medical\ner_softmax\logs# 初始化 TensorBoard 日志记录器writer = SummaryWriter(log_dir=summary_log_dir)# writer.add_graph(net, input_to_model=torch.randint(0, 100, (4, 20)))# 定义损失函数和优化器loss_fn = CustomCrossEntropyLoss(ignore_index=target_padding_idx, summary_writer=writer)train_fn = optim.SGD(params=net.parameters(), lr=lr)# 3/4/5. 遍历训练、评估、持久化# 初始化最佳准确率best_acc = float('-inf')#开始训练循环,遍历指定的总轮数。for epoch in range(total_epoch):# 当前epoch模型训练net.train()train_fn.zero_grad()for batch, (x, mask, y) in enumerate(train_dataloader):# a. 前向过程score = net(token_ids=x, token_masks=mask) # score: [N,T,num_class]loss = loss_fn(torch.permute(score, dims=(0, 2, 1)), y)acc1 = token_accuracy(score, y, target_masks=mask)negative_acc, positive_acc, acc2 = token_accuracy_v2(score, y, target_masks=mask)# b. 反向更新loss.backward()train_fn.step()train_fn.zero_grad()# c. 日志的输出/运行过程的输出writer.add_scalar('train_loss', loss.item())writer.add_scalars('train_acc', {'acc': acc2,'positive': positive_acc,'negative': negative_acc})print(f"Epoch {epoch}/{total_epoch} Batch {batch} Loss {loss.item():.5f} "f"Token Accuracy {acc1:.3f} - {negative_acc:.3f} - {positive_acc:.3f} - {acc2:.3f}")# 当前epoch的模型评估&持久化net.eval()with torch.no_grad():eval_acc_number = [0.0, 0.0, 0.0, 0.0]for _, (x, mask, y) in enumerate(eval_dataloader):score = net(token_ids=x, token_masks=mask) # score: [N,?,num_class]acc_number = calc_token_accuracy_number_v2(score, y, mask)for i in range(4):eval_acc_number[i] += acc_number[i]eval_negative_acc = div(eval_acc_number[0], eval_acc_number[1])eval_positive_acc = div(eval_acc_number[2], eval_acc_number[3])eval_acc = div(eval_acc_number[0] + eval_acc_number[2], eval_acc_number[1] + eval_acc_number[3])print(f"Epoch {epoch}/{total_epoch} "f"Eval Token Accuracy {eval_negative_acc:.3f} - {eval_positive_acc:.3f} - {eval_acc:.3f}")writer.add_scalars('eval_acc', {'acc': eval_acc,'positive': eval_positive_acc,'negative': eval_negative_acc}, global_step=epoch)# 模型持久化save_obj = {'net': net.state_dict(),'epoch': epoch,'best_accuracy': eval_acc}if eval_acc > best_acc:print(f"保存最优模型为 {epoch} {eval_acc} {best_dump_path}")torch.save(save_obj, best_dump_path)best_acc = eval_acctorch.save(save_obj, last_dump_path)# 训练完成后,关闭相关资源writer.close()
10. 主函数
if __name__ == '__main__':bert_tokenizer_path = r"D:\cache\huggingface\hub\models--bert-base-chinese\snapshots\c30a6ed22ab4564dc1e3b2ecbf6e766b0611a33f"train_data_path = "./datas/medical/training.txt"eval_data_path = "./datas/medical/test.json"label_path = "./datas/medical/categories.json"model_save_path = "./output/medical/ner_softmax/models"summary_log_dir = "./output/medical/ner_softmax/logs"train(train_data_path, eval_data_path, bert_tokenizer_path, label_path,model_save_path, summary_log_dir)