说明
在重塑实体识别4中梳理了数据流,然后我发现pipeline的串行效率太低了,所以做了并行化改造。里面还是有不少坑的,记录一下。
内容
1 pipeline
官方的pipeline看起来的确是比较好用的,主要是实现了比较好的数据预处理。因为在训练/使用过程中都要进行数据的令牌化与反令牌化,有些字符会被特殊处理,例如 '##A’等。
在使用过程中,我用200条新闻数据进行测试,用pipeline方法花了11分钟处理完毕,期间CUDA的使用率大约为10%。按此估算,即使用多接口并行的方式,那么一分钟最多处理2000条,一天最多处理0.14*2000~30万条数据。这个效率太低了。
2 并行化
最终的结论是不到30秒处理200条,显存只占用2.6G,理论上可以支持3个服务并行(以确保GPU的完全利用)。按最保守的估计,改造后的并行化应该可以提升3倍的效率,稍微激进一点,可以提升10倍的效率。这个之后可以进行测试。
一些主要的点如下
2.1 结果解析
结果可以分为:
- 1 仅含解析出的实体列表,用逗号连接字符串表示。
- 2 含实体及其起始位置的表示,这个用于标注反馈、二次增强处理。
- 3 仅含BIO标签,主要用于和测试数据进行效果比对。
对应的相关函数,看起来有点繁杂,我自己都不太想看第二眼。
from datasets import ClassLabel
# 定义标签列表
label_list = ['B', 'I', 'O']
# 创建 ClassLabel 对象
class_label = ClassLabel(names=label_list)
def convert_entity_label_batch(x):x1 = xreturn class_label.int2str(x1)
# 定义函数将整数Tensor转换为字符串 | 反令牌函数,但是用不上;因为predict label列表的长度和 ss_padding相同
def tensor_to_string(tensor, tokenizer = None , skip_special_tokens = True):return tokenizer.decode(tensor.tolist(), skip_special_tokens=skip_special_tokens).replace(' ','')from datasets import ClassLabel
def detokenize(word_piece):"""将 WordPiece 令牌还原为原始句子。"""if word_piece.startswith('##'):x = word_piece[2:]else:x = word_piecereturn x
import re
def extract_bio_positions(bio_string):pattern = re.compile(r'B(I+)(O|$)')matches = pattern.finditer(bio_string)results = []for match in matches:start, end = match.span()results.append((start, end - 1)) # end-1 to include the last 'I'return results# 0.1ms
def parse_ent_pos_map_batch(some_dict = None):word_list = some_dict['token_words']label_list = [int(x) for x in list(some_dict['label_list'])]min_len = min(len(word_list),len(label_list))word_list = word_list[:min_len]label_list = label_list[:min_len]label_list1 = list(map(convert_entity_label_batch,label_list))oriword_list1 = list(map(detokenize,word_list))ori_word_str =''.join(oriword_list1)# 补到等长label_str = ''for i in range(len(label_list1)):len_of_ori_word = len(oriword_list1[i])if len_of_ori_word == 1:tem_str = label_list1[i]else:if label_list1[i] in ['I','O']:tem_str = label_list1[i] * len_of_ori_wordelse:tem_str = 'B' + 'I' * (len_of_ori_word -1) label_str += tem_strpos_list = extract_bio_positions(label_str)part_ent_list = [(ori_word_str[x[0]:x[1]] , *x) for x in pos_list]return part_ent_list# =============
def make_BIO_by_len(some_len):default_str = 'I' * some_lenstr_list = list(default_str)str_list[0] ='B'return str_list
def gen_BIO_list2(some_dict):the_content = some_dict['clean_data']ent_list = some_dict['ent_tuple_list']content_list = list(the_content)tag_list = list('O'* len(content_list))for ent_info in ent_list:start = ent_info[1]end = ent_info[2]label_len = end-starttem_bio_list = make_BIO_by_len(label_len)tag_list[start:end] = tem_bio_listres_dict = {}res_dict['x'] = content_listres_dict['y'] = tag_listreturn res_dictdef trim_len(some_dict = None):padding_BIO = some_dict['padding_BIO']ss_len = some_dict['ss_len']return padding_BIO[:ss_len]
2.2 批量预测
看起来同样很繁杂,但是不得不细看。首先,数据会按照几个长度 20,50,198分为三部分处理,batch_predict每次仅处理一个批次。在这里,将数据转为定长的令牌长度,然后转入CUDA进行批量预测。结果再按照实体-位置 tuple, 实体列表和BIO三种方式进行解析。
from functools import partial
import transformers
import torch
from transformers import AutoModelForMaskedLM, AutoTokenizer,AutoModelForTokenClassification
from functools import partial
# some_batch 是原文经过padding的数据,['ss_hash','ss','ss_len', 'ss_padding'], 其中ss_padding的长度是固定的
# 模型文件和令牌文件都放在model_path之下,model比较大,避免重载;而tokenize会有padding过程,必须重载
# 模型先载入cuda
def batch_predict(some_batch, ss_padding_len = None, model = None, model_path = None):# 因为tokenize会在令牌的前后加上分隔令牌,所以+2if ss_padding_len is None:ss_padding_len = some_batch['ss_padding'].apply(len).max()print('ss_padding_len is %s ' % ss_padding_len)max_len = ss_padding_len+2tokenizer = AutoTokenizer.from_pretrained(model_path)tencoder = partial(tokenizer.encode,truncation=True, max_length=max_len, is_split_into_words=True, return_tensors="pt", padding='max_length')some_batch['ss_padding_token'] = some_batch['ss_padding'].apply(list).apply(tencoder)# 构成矩阵minput = torch.cat(list(some_batch['ss_padding_token'].values))# 将数据搬到GPU中处理再返回with torch.no_grad():input_cuda = minput.to(device)outputs_cuda = model(input_cuda).logitspredictions = torch.argmax(outputs_cuda, dim=2)predictions_list = list(predictions.to('cpu').numpy())predict_list1 = []for predictions in predictions_list:tem_pred_tag = [int(x) for x in predictions[1:-1]]predict_list1.append(tem_pred_tag)some_batch['label_list'] = predict_list1_s = cols2s(some_df =some_batch, cols= ['ss_padding','label_list'], cols_key_mapping= ['token_words', 'label_list'])_s1 = _s.apply(parse_ent_pos_map_batch)some_batch['ent_tuple_list'] = list(_s1)some_batch['ent_list'] = some_batch['ent_tuple_list'].apply(lambda x: ','.join([a[0] for a in x ]))_s = cols2s(some_batch, cols= ['ss_padding', 'ent_tuple_list'], cols_key_mapping= ['clean_data', 'ent_tuple_list'])s1 = _s.apply(gen_BIO_list2)ent_tuple_res_df1 = pd.DataFrame(s1.to_list())some_batch['padding_BIO'] = list(ent_tuple_res_df1['y'].apply(lambda x: ''.join(x)))_s00 = cols2s(some_batch, cols = ['ss_len', 'padding_BIO'], cols_key_mapping=['ss_len', 'padding_BIO'])some_batch['BIO'] = list(_s00.apply(trim_len))return some_batch
3 迭代器
在推送数据处理时,可以采用迭代器来控制不同的批次数据
# 迭代器切分
import pandas as pd
class DataFrameBatchIterator:def __init__(self, dataframe, batch_size):self.dataframe = dataframeself.batch_size = batch_size# 【我增加的】self.fail_batch_list = []def __iter__(self):num_rows = len(self.dataframe)num_batches = (num_rows - 1) // self.batch_size + 1for i in range(num_batches):start_idx = i * self.batch_sizeend_idx = (i + 1) * self.batch_sizebatch_data = self.dataframe.iloc[start_idx:end_idx]yield batch_data# 【我增加的】def clear_fail(self):self.fail_batch_list = []# 【我增加的】def get_some_batch(self, batch_idx):return self.dataframe.iloc[self.batch_size * batch_idx: self.batch_size * (batch_idx + 1)]# 【我增加的】记录失败的批次def rec_fail_batch_idx(self, batch_idx):self.fail_batch_list.append(batch_idx)
# 创建一个示例 DataFrame
data = {'Name': ['John', 'Jane', 'Mike', 'Alice', 'Bob'],'Age': [25, 30, 35, 28, 32],'City': ['New York', 'Paris', 'London', 'Tokyo', 'Sydney']}
df = pd.DataFrame(data)
# 创建 DataFrame 迭代器
batch_iterator = DataFrameBatchIterator(df, batch_size=2)
import tqdm
# 使用迭代器逐批次处理数据
for i,batch in tqdm.tqdm(enumerate(batch_iterator)):try:# 在这里可以对当前批次的数据进行相应的操作# 例如进行数据清洗、特征处理、模型训练等# 示例:打印当前批次的数据
# raise Exception(e) print(batch)except:print('>>> %s Fail' % i)batch_iterator.rec_fail_batch_idx(i)
以下是实际的调度
# 假设处理长度为1万的句子
# 20 * 2000 ~ 4w
# 50 * 800 ~ 4w
# 200 * 200 ~ 4w
import warnings
warnings.filterwarnings('ignore')
batch_slice_para = {20:2000, 50:800, 200:200}
batch_len_list = sorted(list(batch_slice_para.keys()))
batch_len_list.insert(0,0)batch_df_list = []
for i in range(len(batch_len_list)):if i >0:sel = (ss_df['ss_len'] >=batch_len_list[i-1]) & (ss_df['ss_len'] < batch_len_list[i])if sel.sum():padding_len = batch_len_list[i]padding_batch = batch_slice_para[padding_len]tem_df= ss_df[sel]# tem_df['ss_padding'] = tem_df['ss'].apply(lambda x: x.ljust(padding_len,'a'))tem_df['ss_padding'] = tem_df['ss']tem_df_iterator = DataFrameBatchIterator(tem_df, padding_batch)batch_df_list.append(tem_df_iterator)else:batch_df_list.append(None)
对每个批次执行处理,载入模型
label_list = ['B','I','O']
model_checkpoint = 'model03'
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device: %s' % device)
# 自动切换设备
if model.device.type != device:model.to(device)print('>>> 检测到模型设备与当前指定不一致,切换 %s' % device )
else:print('>>> 模型设备一致,不切换 %s' % device)
分批次预测(主要是确保显存不溢出)
batch_res_list = []
for some_iter in batch_df_list:for some_batch in some_iter:batch_res = batch_predict(some_batch, model = model, model_path = 'model03')batch_res_list.append(batch_res)
结果合并
batch_res_df = pd.concat(batch_res_list, ignore_index= True)
mdf = pd.merge(input_df , batch_res_df[['ss_hash', 'ent_list']],how='left', on ='ss_hash')
4 总结
一个在理论上证明可以显著提升效率的点在于,模型进行实体识别时先切分了短句,然后按短句进行了去重:相同短句的实体结果一定是相同的。
实验中,200条新闻产生了约5万个短句,去重后只剩下约3.5万。所以即使在这一步也是有提升的。当然,这种方式同样也可以被用于pipeline。
还有就是在处理填充时,并不按照最大长度统一填充。而是按照句子长度的统计特性分为了短、中、长三种方式。从统计上看,约70%的短句长度是在20个字符以内的,真正超过50个字符的短句(中间无分隔符),即使从语法上来看也是比较奇怪的。
这样在填充数据时浪费就比pipeline要小,同样显存可以装下更多的数据。