Datawhale AI夏令营 AI+逻辑推理 Task2总结
一、大语言模型解题方案介绍
1.1 大模型推理介绍
推理是建立在训练完成的基础上,将训练好的模型应用于新的、未见过的数据,模型利用先前学到的规律进行预测、分类和生成新内容,使得AI在实际应用中能够做出更有意义的决策。
1.2 大模型推理实现最常用方法——提示工程(Prompt Engineering)
提示工程是一门较新的学科,关注提示词开发和优化,帮助用户将大语言模型用于各场景和研究领域。研究人员可利用提示工程来提升大语言模型处理复杂任务场景的能力,如问答和算术推理能力。开发人员可通过提示工程设计、研发强大的工程技术,实现和大语言模型或其他生态工具的高效接轨。
提示工程不仅仅是关于设计和研发提示词。它包含了与大语言模型交互和研发的各种技能和技术。提示工程在实现和大语言模型交互、对接,以及理解大语言模型能力方面都起着重要作用。用户可以通过提示工程来提高大语言模型的安全性,也可以赋能大语言模型,比如借助专业领域知识和外部工具来增强大语言模型能力。
本赛题中,通过数据处理,将问题字典转化为MD格式的prompt,这就是让大语言模型能理解并作分析的秘诀。
二、Baseline整体代码介绍
整体代码主要包括答案生成和纠错与结果文件生成两大模块。
答案生成部分包括大模型的处理函数、大模型返回结果抽取、多线程处理及答案生成的启动,代码核心是大模型部分。
纠错与结果生成部分存在的目的是由于目前使用了API调用在线开源大模型,因为网络、模型能力等原因会导致有一些结果会出现缺失。(比如大模型回答时,没有明确给出ABCD的结果,而返回的空值。也有时因为网络retry模块机会使用结束后,依然没有提取到结果会跳过某个问题。)
2.1 环境配置
导入需要的环境,包括日志处理、多线程、API请求等相关库引入。
import json
import os
from pprint import pprint
import re
from tqdm import tqdm
import randomimport uuid
import openai
import tiktoken
import json
import numpy as np
import requests
from retry import retry
from scipy import sparse
from http import HTTPStatus
import dashscopefrom concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
import json
import time
from tqdm import tqdmlogger.remove() # 移除默认的控制台输出
logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip")MODEL_NAME = 'qwen2-7b-instruct'
2.2 答案生成部分
def call_qwen_api(MODEL_NAME, query):# 这里采用dashscope的api调用模型推理,通过http传输的json封装返回结果messages = [{'role': 'user', 'content': query}]response = dashscope.Generation.call(MODEL_NAME,messages=messages,result_format='message', # set the result is message format.)if response.status_code == HTTPStatus.OK:# print(response)return response['output']['choices'][0]['message']['content']else:print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (response.request_id, response.status_code,response.code, response.message))raise Exception()
call_qwen_api
函数就是通过输入模型名称、prompt,完成大模型api的调用。
def api_retry(MODEL_NAME, query):# 最大尝试次数max_retries = 5# 再次尝试等待时间retry_delay = 60 # in secondsattempts = 0while attempts < max_retries:try:return call_qwen_api(MODEL_NAME, query)except Exception as e:attempts += 1 if attempts < max_retries:logger.warning(f"Attempt {attempts} failed for text: {query}. Retrying in {retry_delay} seconds...")time.sleep(retry_delay)else:logger.error(f"All {max_retries} attempts failed for text: {query}. Error: {e}")raise
api_retry
函数是当大模型调用API时可能会导致出错中断的问题,为了保证每个问题都被大模型处理过,我们需要设置一个反复尝试的函数。
def get_prompt(problem, question, options):options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))prompt = f"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:### 题目:
{problem}### 问题:
{question}
{options}
"""# print(prompt)return prompt
get_prompt
是prompt的模板函数,通过字符串处理的方式拼接完整的prompt
# 这里使用extract抽取模获得抽取的结果def extract(input_text):# re.compile()将字符串形式的正则表达式编译为Pattern模式对象,第二个参数是匹配模式ans_pattern = re.compile(r"答案是:(.)", re.S)problems = ans_pattern.findall(input_text)# print(problems)if(problems == ''):return 'A'return problems[0]
通过抽取函数可以将大语言模型生成的结果抽取成答案对应的选项,这里的匹配原则和prompt呼应。我们可以看到prompt要求【最后一行的格式为"答案是:A"】这样的规范,那么我们采用正则表达式re.compile方法匹配到答案对应的选项。当我们匹配为空时,我们默认选"A"。
def process_datas(datas,MODEL_NAME):results = []# 定义线程池 选择16线程with ThreadPoolExecutor(max_workers=16) as executor:# 使用future_data 存储每个线程的数据future_data = {}lasttask = ''lastmark = 0# lens记录了调用api的次数,也就是每个问题背景下的所有子问题之和lens = 0# 进入多线程任务# 这里每个data下是一个问题背景,其中包含多个子问题for data in tqdm(datas, desc="Submitting tasks", total=len(datas)):problem = data['problem']# 用enumerate方法每次循环得到问题的序号id和实际的问题for id, question in enumerate(data['questions']):prompt = get_prompt(problem, question['question'], question['options'],)# 送入线程池等待处理,使用api_retry,向api_retry传入MODEL_NAME,prompt参数future = executor.submit(api_retry, MODEL_NAME, prompt)# 每个线程存储对应的json问题数据以及问题序号id,方便定位出执行的是哪个子问题future_data[future] = (data, id)time.sleep(0.6) # 控制每0.6秒提交一个任务lens += 1# 处理多线程任务for future in tqdm(as_completed(future_data), total=lens, desc="Processing tasks"):# print('data',data)# 取出每个线程中的字典数据及对应的问题iddata = future_data[future][0]problem_id = future_data[future][1]try:# 获取api运行结果res = future.result()# 抽取大语言模型返回结果extract_response = extract(res)# print('res',extract_response)# 装入answer字段data['questions'][problem_id]['answer'] = extract_response# 在结果列表中新增数据字典results.append(data)# print('data',data)except Exception as e:logger.error(f"Failed to process text: {data}. Error: {e}")return results
多线程处理数据
def main(ifn, ofn):if os.path.exists(ofn):passdata = []# 按行读取数据with open(ifn) as reader:for line in reader:sample = json.loads(line)data.append(sample)datas = data# print(data)# 均匀地分成多个数据集return_list = process_datas(datas,MODEL_NAME)print(len(return_list))print("All tasks finished!")return return_listif __name__ == '__main__':a = extract("""根据欧几里得算法,逐步解析计算两个数6和7的最大公约数(gcd)的步骤如下:1. 判断6和7是否相等:不相等。
2. 判断6和7大小关系,7 > 6,所以用更大的数7减去较小的数6得到结果1。
3. 现在计算6和1的最大公约数。
4. 6 > 1,根据算法用更大的数6减去较小的数1得到结果5。
5. 再计算5和1的最大公约数。
6. 5 > 1,用5减去1得到结果4。
7. 再计算4和1的最大公约数。
8. 4 > 1,用4减去1得到结果3。
9. 再计算3和1的最大公约数。
10. 3 > 1,用3减去1得到结果2。
11. 再计算2和1的最大公约数。
12. 2 > 1,用2减去1得到结果1。
13. 最后计算1和1的最大公约数,两数相等,gcd即为这两个数,也就是1。因此,6和7的最大公约数是1。答案是:C.""")print(a)# 调用主函数return_list = main('round1_test_data.jsonl', 'upload.jsonl')
启动函数
2.3 纠错与结果文件生成部分
def has_complete_answer(questions):# 这里假设完整答案的判断逻辑是:每个question都有一个'answer'键for question in questions:if 'answer' not in question:return Falsereturn Truedef filter_problems(data):result = []problem_set = set()for item in data:# print('处理的item' ,item)problem = item['problem']if problem in problem_set:# 找到已存在的字典for existing_item in result:if existing_item['problem'] == problem:# 如果当前字典有完整答案,替换已存在的字典if has_complete_answer(item['questions']):existing_item['questions'] = item['questions']existing_item['id'] = item['id']breakelse:# 如果当前字典有完整答案,添加到结果列表if has_complete_answer(item['questions']):result.append(item)problem_set.add(problem)return resultreturn_list
return_list = filter_problems(return_list)
sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:]))
print(sorted_data)
将一个问题背景下的所有问题存入同一个字典,并按id序号排序。
def find_missing_ids(dict_list):# 提取所有序号extracted_ids = {int(d['id'][-3:]) for d in dict_list}# 创建0-500的序号集合all_ids = set(range(500))# 找出缺失的序号missing_ids = all_ids - extracted_idsreturn sorted(missing_ids)# 示例字典列表
dict_list = sorted_data# 找出缺失的序号
missing_ids = find_missing_ids(dict_list)
print("缺失的序号:", missing_ids)len(missing_ids)
纠错函数
data = []
with open('round1_test_data.jsonl') as reader:for id,line in enumerate(reader):if(id in missing_ids):sample = json.loads(line)for question in sample['questions']:question['answer'] = 'A'sorted_data.append(sample)
sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:]))
补错函数,针对空缺的列表我们进行补错,让每个answer字段默认填充为A。
with open('upload.jsonl', 'w') as writer:for sample in sorted_data:writer.write(json.dumps(sample, ensure_ascii=False))writer.write('\n')
生成结果文件并存储
三、Baseline改进
- 原baseline中针对空缺的列表补错时,只是将每个answer字段默认填充为A,为了提高准确率,这里把它送入多线程函数再处理一遍
- 改进prompt