命名实体识别(NER)
命名实体识别(Named Entity Recognition, NER)是自然语言处理(NLP)中的一个关键任务,其目标是从文本中识别出具有特定意义的实体,并将其分类到预定义的类别中。这些实体可以是人名、地名、组织机构名、日期时间、货币金额等。
- 主要功能:
- 实体识别:从文本中找出所有可能的命名实体。
- 实体分类:将识别出来的实体归类到预先定义好的类别中,如人名、地名、组织名等。
- 边界检测:确定每个实体在文本中的起始和结束位置。
- 应用场景:
- 信息检索:帮助搜索引擎理解查询意图,提供更精准的搜索结果。
- 问答系统:辅助解析用户问题,提高答案的准确性。
- 机器翻译:保留原文中的专有名词不被翻译,或根据上下文正确翻译。
- 数据挖掘:从大量文本数据中提取有价值的信息,如市场分析、舆情监控等。
- 个性化推荐:通过分析用户的兴趣点,提供个性化的服务和内容。
更多细节可以参考:命名实体识别综述。
本文目标
- 从公开的新闻报道
标题
中提取地名
,这里的地名主要是一些国家名称
。 - 使用预训练的中文Bert模型,
bert-base-chinese
。 - 数据集的标注方式为BIO。
获取数据集
我们直接抓取漂亮国的中文发布网站的数据。
这里,我把数据存在PostgreSQL数据库里面,我建议大家安装一个数据库,非常方便数据抓取。
import time
import requests
import pandas as pd
from sqlalchemy import create_engine
from tqdm import tqdm
from bs4 import BeautifulSoupuser = 'postgres'
password = '你的密码'
db_name = '你的数据库名称'
db_url = f'postgresql://{user}:{password}@localhost:5432/{db_name}'
engine = create_engine(db_url)def get_title(url):res = requests.get(url, headers=headers)try:txt = res.content.decode('gbk')except UnicodeDecodeError as e:txt = res.content.decode('utf-8')soup = BeautifulSoup(txt, 'lxml') data = []for li in soup.find_all('li', class_='collection-result'):try:href = li.find('a')['href']except:href = '无数据'try:title = li.find('a').text.replace('\n','').replace('\t','')except:title = '无数据'try:date = li.find('div').text.replace('\n','').replace('\t','')except:date = '无数据'data.append([href, title, date])return pd.DataFrame(data, columns=['href','title','date'])def get_news(url):res = requests.get(url, headers=headers)try:txt = res.content.decode('gbk')except UnicodeDecodeError as e:txt = res.content.decode('utf-8')soup = BeautifulSoup(txt, 'lxml')data = []for div in soup.find_all('div', class_='entry-content'):try:text = '\n'.join([p.get_text(strip=True) for p in div.find_all('p')[:-2]])except:text = '无数据'data.append({'href': url, 'text': text})return pd.DataFrame(data)headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0','cookie':'自己去网站找'
}# 这里是抓取对应标题和url
for i in range(53): # 页数url = f'https://www.state.gov/translations/chinese/page/{i}/'df = get_title(url)print(f'正在抓取: {url}, 数据长度: {len(df)}')df.to_sql('mfa_usa', con=engine, if_exists='append', index=False)time.sleep(30)# 这里是抓取完整的报道
df = pd.read_sql('select * from mfa_usa', con=engine)
pbar = tqdm(list(df.href)[10:])
for url in pbar:pbar.set_description('Processing %s')df0 = get_news(url)df0.to_sql('mfa_usa_news', con=engine, if_exists='append', index=False)time.sleep(4)
- 标题
- 全文
- 一共是500+的数据,差不多了,标注也挺麻烦的。
标注数据集
因为我的任务是提取地名,所以使用比较简单的BIO进行:
- B-NP:开头
- I-NP:中间
- O:不是需要识别的词/字
这里推荐一个开源的NLP标注工具:MarkStudio。
第一步,转换数据格式
下载好之后,打开exe就可以导入自己的数据开始标注,但是数据必须以txt的形式导入
,如下图所示。下面是简单的处理脚本
。
import pandas as pddf = pd.read_csv('data/data.csv')# 将每一行数据写入txt文件
txt_file = 'data/ner_label_in.txt'
with open(txt_file, 'w', encoding='utf-8') as f:for index, row in df.iterrows():f.write(row['text'] + '\n') #
print(f"数据已成功写入 {txt_file} 文件!")
第二步,定义标签组
待标注数据准备好之后,我们打开标注工具,然后自定义标签(你也可以使用该工具自带的标签),如下图。
第三步,创建标注工程
回到工程管理,新建工程,然后导入待标注的txt文件
,如下图。
- 建工程
- 导数据
第四步,标注实体
切换到工作台,就可以开始标注数据。
鼠标选中需要标的字或词,他会自动弹出我们预先选择的实体类型,如下图。
第五步,导出标注数据
该工具导出的标注数据为json格式。所以我后面在进行实验时,进行了预处理。
回到工程管理,点击导出数据即可,如下图。
我们就导出已经标注的数据。
微调Bert
数据预处理
import json
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict# 来自标注好的JSON文件
with open(LABEL_DATA_PATH, 'r', encoding='utf-8') as f:data = json.load(f)texts = []
labels = []for entry in data:text = entry['content']label_sequence = ['O'] * len(text) # 初始化所有字符的标签为 'O'for tag in entry['tags']:if tag['name'] == 'PLACE':start = tag['start']end = tag['end']# 将开始位置标记为 'B-PLACE'label_sequence[start] = 'B-PLACE'# 将后续位置标记为 'I-PLACE'for i in range(start + 1, end):label_sequence[i] = 'I-PLACE'# 将标签转换为标签索引label_indices = [label2id[label] for label in label_sequence]texts.append(text)labels.append(label_indices)# 检查转换后的格式
print("Texts:", texts[-2:])
print("Labels:", labels[-2:])# 划分数据集--训练测试和验证
texts_train, texts_temp, labels_train, labels_temp = train_test_split(texts, labels, test_size=0.2, random_state=42
)
texts_val, texts_test, labels_val, labels_test = train_test_split(texts_temp, labels_temp, test_size=0.5, random_state=42
)# 构造字典形式的数据
def create_dataset(texts, labels):ids = list(range(len(texts)))tokens_list = [list(text) for text in texts]return {'id': ids, 'tokens': tokens_list, 'ner_tags': labels}train_data = create_dataset(texts_train, labels_train)
val_data = create_dataset(texts_val, labels_val)
test_data = create_dataset(texts_test, labels_test)# 创建 Dataset 和 DatasetDict
train_dataset = Dataset.from_dict(train_data)
val_dataset = Dataset.from_dict(val_data)
test_dataset = Dataset.from_dict(test_data)# 最终的数据集
ner_data = DatasetDict({'train': train_dataset,'validation': val_dataset,'test': test_dataset
})
编码文本
from transformers import BertTokenizerFastdef tokenize_and_align_labels(examples, label_all_tokens=True):tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)labels = []for i, label in enumerate(examples["ner_tags"]):word_ids = tokenized_inputs.word_ids(batch_index=i)# word_ids() => Return a list mapping the tokens# to their actual word in the initial sentence.# It Returns a list indicating the word corresponding to each token.previous_word_idx = Nonelabel_ids = []# Special tokens like `` and `<\s>` are originally mapped to None# We need to set the label to -100 so they are automatically ignored in the loss function.for word_idx in word_ids:if word_idx is None:# set –100 as the label for these special tokenslabel_ids.append(-100)# For the other tokens in a word, we set the label to either the current label or -100, depending on# the label_all_tokens flag.elif word_idx != previous_word_idx:# if current word_idx is != prev then its the most regular case# and add the corresponding tokenlabel_ids.append(label[word_idx])else:# to take care of sub-words which have the same word_idx# set -100 as well for them, but only if label_all_tokens == Falselabel_ids.append(label[word_idx] if label_all_tokens else -100)# mask the subword representations after the first subwordprevious_word_idx = word_idxlabels.append(label_ids)tokenized_inputs["labels"] = labelsreturn tokenized_inputstokenizer = BertTokenizerFast.from_pretrained(MODEL_PATH+MODEL_NAME) # 自己下载的中文 BERT 模型
# 应用于整个数据
tokenized_datasets = ner_data.map(tokenize_and_align_labels, batched=True)
定义模型
from torch.optim import AdamW
from transformers import Trainer, TrainingArguments
from transformers import DataCollatorForTokenClassification# 初始化模型
model = AutoModelForTokenClassification.from_pretrained(MODEL_PATH+MODEL_NAME, num_labels=NUM_LABELS)
构建Trainer
from torch.optim import AdamW
from transformers import Trainer, TrainingArguments
from transformers import DataCollatorForTokenClassificationdef calculate_ner_metrics(true_labels, pred_labels):"""自定义评估函数,输入为二维列表,输出为各指标"""assert len(true_labels) == len(pred_labels), "true_labels 和 pred_labels 的长度必须一致"# 初始化统计变量total_true = 0 # 总的真实实体数total_pred = 0 # 总的预测实体数total_correct = 0 # 预测正确的实体数total_tokens = 0 # 总的标注的token数correct_tokens = 0 # 预测正确的token数# 遍历每个序列for true_seq, pred_seq in zip(true_labels, pred_labels):assert len(true_seq) == len(pred_seq), "每个序列的长度必须一致"for true, pred in zip(true_seq, pred_seq):# 统计 token-level 准确性total_tokens += 1if true == pred:correct_tokens += 1# 如果是实体标签,更新统计if true != "O": # 真实标签为实体total_true += 1if true == pred: # 预测正确的实体total_correct += 1if pred != "O": # 预测标签为实体total_pred += 1# 计算指标accuracy = correct_tokens / total_tokens if total_tokens > 0 else 0.0precision = total_correct / total_pred if total_pred > 0 else 0.0recall = total_correct / total_true if total_true > 0 else 0.0f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0metrics = {"accuracy": accuracy,"precision": precision,"recall": recall,"f1_score": f1}return metricsdef compute_metrics(pred):pred_logits, labels = predpred_logits = pred_logits.argmax(-1)# 取去除 padding 的部分predictions = [[id2label[eval_preds] for (eval_preds, l) in zip(prediction, label) if l != -100]for prediction, label in zip(pred_logits, labels)]true_labels = [[id2label[l] for (eval_preds, l) in zip(prediction, label) if l != -100]for prediction, label in zip(pred_logits, labels)]result = calculate_ner_metrics(true_labels,predictions)return result# 重写 Trainer 类
class CustomTrainer(Trainer):def create_optimizer(self):if self.optimizer is None:# 获取模型参数decay_parameters = [p for n, p in self.model.named_parameters() if n.endswith("weight")]no_decay_parameters = [p for n, p in self.model.named_parameters() if n.endswith("bias")]# 将参数分组optimizer_grouped_parameters = [{"params": decay_parameters, "weight_decay": self.args.weight_decay},{"params": no_decay_parameters, "weight_decay": 0.0},]# 使用 AdamW 作为优化器self.optimizer = AdamW(optimizer_grouped_parameters, lr=self.args.learning_rate)return self.optimizer# 创建训练参数
training_args = TrainingArguments(output_dir=OUT_DIR,eval_strategy="epoch",save_strategy="epoch",learning_rate=2e-5,per_device_train_batch_size=BATCH_SIZE,per_device_eval_batch_size=BATCH_SIZE,num_train_epochs=3,weight_decay=0.01,load_best_model_at_end=True,logging_dir=LOG_DIR,save_total_limit=1,
)# 数据收集器,用于将数据转换为模型可接受的格式
data_collator = DataCollatorForTokenClassification(tokenizer) # 定义 Trainer
trainer = CustomTrainer(model=model, # 替换为你的模型args=training_args,train_dataset=tokenized_datasets['train'],eval_dataset=tokenized_datasets['validation'],data_collator=data_collator,tokenizer=tokenizer,compute_metrics=compute_metrics,
)
训练
# 训练 model
trainer.train()# 保存模型
best_ckpt_path = trainer.state.best_model_checkpoint
best_ckpt_path
评估
trainer.evaluate(eval_dataset=tokenized_datasets['test'])
结果
- 训练过程
- 测试集
- 预测
完整代码和数据集发布在Github:chinese_ner_place