前言
本文的前两个部分最早是属于此旧文的《学术论文GPT的源码解读与微调:从ChatPaper到七月论文审稿GPT第1版》,但为了每一篇文章各自的内容更好的呈现,于是我今天做了以下三个改动
- 原来属于mamba第五部分的「Mamba近似工作之线性Transformer:从TransnormerLLM到RWKV」,改放到此文中:学术论文GPT的源码解读与微调:从ChatPaper到七月论文审稿GPT第1版
- 把旧文「学术论文GPT的源码解读与微调」中关于chatpaper相关的部分独立抽取出来成本文:学术论文GPT的源码解读与二次开发:从ChatPaper到gpt_academic
- 故旧文「学术论文GPT的源码解读与微调」的标题就改成了:七月论文审稿GPT第1版:通过3万多篇paper和10多万的review数据微调RWKV
如此,mamba那篇解读可以专注mamba的解读,不把过多篇幅放在mamba之外的RWKV上,且原来论文审稿第一版本身微调的RWKV,故刚好需要介绍下RWKV
且对于学术论文GPT的源码解读与微调本来就还得解读下gpt_academic,故把ChatPaper和gpt_academic这两个开源系统独立成本文,也更好
本文
- 第一部分 解读chatpaper:GitHub - kaixindelele/ChatPaper: Use ChatGPT to summarize the arXiv papers. 全流程加速科研,利用chatgpt进行论文全文总结+专业翻译+润色+审稿+审稿回复,1.4和1.5节和我司杜老师共创
第二部分 解读gpt_academic:GitHub - binary-husky/gpt_academic: 为GPT/GLM等LLM大语言模型提供实用化交互接口,特别优化论文阅读/润色/写作体验,模块化设计,支持自定义快捷按钮&函数插件,支持Python和C++等项目剖析&自译解功能,PDF/LaTex论文翻译&总结功能,支持并行问询多种LLM模型,支持chatglm3等本地模型。接入通义千问, deepseekcoder, 讯飞星火, 文心一言, llama2, rwkv, claude2, moss等。
第一部分 ChatPaper:论文对话、总结、翻译
ChatPaper的自身定位是全流程加速科研:论文总结+专业级翻译+润色+审稿+审稿回复,因为论文更多是PDF的格式,故针对PDF的对话、总结、翻译,便不可避免的涉及到PDF的解析
1.1 论文审稿:ChatPaper/ChatReviewerAndResponse
1.1.1 对PDF的解析:ChatReviewerAndResponse/get_paper.py
// 待更
1.1.2 论文审查:ChatReviewerAndResponse/chat_reviewer.py
使用OpenAI的GPT模型进行论文审查的脚本。它首先定义了一个Reviewer类来处理审查工作,然后在if __name__ == '__main__':语句下使用argparse处理命令行参数,并调用chat_reviewer_main函数来开始审查过程
- 导入模块:比如jieba、tenacity等
- 命名元组定义:用于保存与论文审稿相关的参数
ReviewerParams = namedtuple("ReviewerParams",["paper_path","file_format","research_fields","language"], )
- 判断文本中是否包含中文:
def contains_chinese(text):for ch in text:if u'\u4e00' <= ch <= u'\u9fff':return Truereturn False
- 插入句子到文本
主要功能是在给定文本的每隔一定数量的单词或中文字符后插入一个指定的句子。如果文本行包含中文字符,则使用jieba分词工具来切分中文,否则使用空格来切分:def insert_sentence(text, sentence, interval):# 将输入文本按换行符分割成行lines = text.split('\n')# 初始化一个新的行列表new_lines = []# 遍历每一行for line in lines:# 检查行中是否包含中文字符if contains_chinese(line):# 如果是中文,使用jieba分词工具进行分词words = list(jieba.cut(line))# 定义分隔符为空字符(对于中文分词)separator = ''else:# 如果不包含中文,按空格分割行words = line.split()# 定义分隔符为空格(对于英文或其他非中文语言)separator = ' '# 初始化一个新的单词列表new_words = []# 初始化一个计数器count = 0# 遍历当前行的每一个单词for word in words:# 将当前单词添加到新的单词列表new_words.append(word)# 计数器增加count += 1# 检查是否达到了插入句子的间隔if count % interval == 0:# 在达到指定间隔时,将要插入的句子添加到新的单词列表new_words.append(sentence)# 将新的单词列表连接起来,并添加到新的行列表new_lines.append(separator.join(new_words))# 将新的行列表连接起来,返回结果return '\n'.join(new_lines)
- 论文审稿类:定义了一个Reviewer类,包含以下功能:
第一阶段审稿:先是基于论文标题和摘要,选择要审稿的部分
然后分别实现两个函数# 定义Reviewer类 class Reviewer:# 初始化方法,设置属性def __init__(self, args=None):if args.language == 'en':self.language = 'English'elif args.language == 'zh':self.language = 'Chinese'else:self.language = 'Chinese' # 创建一个ConfigParser对象self.config = configparser.ConfigParser()# 读取配置文件self.config.read('apikey.ini')# 获取某个键对应的值 self.chat_api_list = self.config.get('OpenAI', 'OPENAI_API_KEYS')[1:-1].replace('\'', '').split(',')self.chat_api_list = [api.strip() for api in self.chat_api_list if len(api) > 5]self.cur_api = 0self.file_format = args.file_format self.max_token_num = 4096self.encoding = tiktoken.get_encoding("gpt2")def validateTitle(self, title):# 修正论文的路径格式rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |'new_title = re.sub(rstr, "_", title) # 替换为下划线return new_title
一个stage_1,主要功能是为了与GPT-3模型进行对话,获取模型对于文章的两个最关键部分的选择意见
一个chat_review,主要功能是调用GPT-3模型进行论文审稿,对输入的文章文本进行审查,并按照预定格式生成审稿意见def stage_1(self, paper):# 初始化一个空列表,用于存储生成的HTML内容htmls = []# 初始化一个空字符串,用于存储文章的标题和摘要text = ''# 添加文章的标题text += 'Title: ' + paper.title + '. '# 添加文章的摘要text += 'Abstract: ' + paper.section_texts['Abstract']# 计算文本的token数量text_token = len(self.encoding.encode(text))# 判断token数量是否超过最大token限制的一半减去800if text_token > self.max_token_num/2 - 800:input_text_index = int(len(text)*((self.max_token_num/2)-800)/text_token)# 如果超出,则截取文本以满足长度要求text = text[:input_text_index]# 设置OpenAI API的密钥openai.api_key = self.chat_api_list[self.cur_api]# 更新当前使用的API索引self.cur_api += 1# 如果当前API索引超过API列表的长度,则重置为0self.cur_api = 0 if self.cur_api >= len(self.chat_api_list)-1 else self.cur_api# 创建与GPT-3的对话消息messages = [{"role": "system","content": f"You are a professional reviewer in the field of {args.research_fields}. "f"I will give you a paper. You need to review this paper and discuss the novelty and originality of ideas, correctness, clarity, the significance of results, potential impact and quality of the presentation. "f"Due to the length limitations, I am only allowed to provide you the abstract, introduction, conclusion and at most two sections of this paper."f"Now I will give you the title and abstract and the headings of potential sections. "f"You need to reply at most two headings. Then I will further provide you the full information, includes aforementioned sections and at most two sections you called for.\n\n"f"Title: {paper.title}\n\n"f"Abstract: {paper.section_texts['Abstract']}\n\n"f"Potential Sections: {paper.section_names[2:-1]}\n\n"f"Follow the following format to output your choice of sections:"f"{{chosen section 1}}, {{chosen section 2}}\n\n"},{"role": "user", "content": text},]# 调用OpenAI API与GPT-3进行对话response = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=messages,)# 初始化一个空字符串,用于存储模型的回复result = ''# 遍历模型的回复,将其添加到结果字符串中for choice in response.choices:result += choice.message.content# 打印模型的回复print(result)# 返回模型的回复,将其分割为多个部分return result.split(',')
使用ChatGPT进行审稿,且有tenacity重试机制和更多的功能,其中review_by_chatgpt 调用了上面所示的两个函数,一个stage_1,一个chat_reviewdef chat_review(self, text):# 设置OpenAI API的密钥openai.api_key = self.chat_api_list[self.cur_api]# 更新当前使用的API密钥索引self.cur_api += 1# 如果当前API密钥索引超过API密钥列表的长度,则将其重置为0self.cur_api = 0 if self.cur_api >= len(self.chat_api_list)-1 else self.cur_api# 定义用于审稿提示的token数量review_prompt_token = 1000# 计算输入文本的token数量text_token = len(self.encoding.encode(text))# 计算输入文本的截取位置input_text_index = int(len(text)*(self.max_token_num-review_prompt_token)/text_token)# 截取文本并添加前缀input_text = "This is the paper for your review:" + text[:input_text_index]# 从'ReviewFormat.txt'文件中读取审稿格式with open('ReviewFormat.txt', 'r') as file:review_format = file.read()# 创建与GPT-3的对话消息messages=[{"role": "system", "content": "You are a professional reviewer in the field of "+args.research_fields+". Now I will give you a paper. You need to give a complete review opinion according to the following requirements and format:"+ review_format +" Please answer in {}.".format(self.language)},{"role": "user", "content": input_text},]# 调用OpenAI API与GPT-3进行对话response = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=messages,)# 初始化一个空字符串,用于存储模型的回复result = ''# 遍历模型的回复,将其添加到结果字符串中for choice in response.choices:result += choice.message.content# 在结果中插入特定的句子,警告不允许复制result = insert_sentence(result, '**Generated by ChatGPT, no copying allowed!**', 15)# 追加伦理声明result += "\n\n⚠伦理声明/Ethics statement:\n--禁止直接复制生成的评论用于任何论文审稿工作!\n--Direct copying of generated comments for any paper review work is prohibited!"# 打印分隔符和结果print("********"*10)print(result)print("********"*10)# 打印相关的token使用信息和响应时间print("prompt_token_used:", response.usage.prompt_tokens)print("completion_token_used:", response.usage.completion_tokens)print("total_token_used:", response.usage.total_tokens)print("response_time:", response.response_ms/1000.0, 's')# 返回模型生成的审稿意见return result
def review_by_chatgpt(self, paper_list):# 创建一个空列表用于存储每篇文章审稿后的HTML格式内容htmls = []# 遍历paper_list中的每一篇文章for paper_index, paper in enumerate(paper_list):# 使用第一阶段审稿方法选择文章的关键部分sections_of_interest = self.stage_1(paper)# 初始化一个空字符串用于提取文章的主要部分text = ''# 添加文章的标题text += 'Title:' + paper.title + '. '# 添加文章的摘要text += 'Abstract: ' + paper.section_texts['Abstract']# 查找并添加“Introduction”部分intro_title = next((item for item in paper.section_names if 'ntroduction' in item.lower()), None)if intro_title is not None:text += 'Introduction: ' + paper.section_texts[intro_title]# 同样地,查找并添加“Conclusion”部分conclusion_title = next((item for item in paper.section_names if 'onclusion' in item), None)if conclusion_title is not None:text += 'Conclusion: ' + paper.section_texts[conclusion_title]# 遍历sections_of_interest,添加其他感兴趣的部分for heading in sections_of_interest:if heading in paper.section_names:text += heading + ': ' + paper.section_texts[heading]# 使用ChatGPT进行审稿,并得到审稿内容chat_review_text = self.chat_review(text=text)# 将审稿的文章编号和内容添加到htmls列表中htmls.append('## Paper:' + str(paper_index+1))htmls.append('\n\n\n')htmls.append(chat_review_text)# 获取当前日期和时间,并转换为字符串格式date_str = str(datetime.datetime.now())[:13].replace(' ', '-')try:# 创建输出文件夹export_path = os.path.join('./', 'output_file')os.makedirs(export_path)except:# 如果文件夹已存在,则不执行任何操作pass# 如果是第一篇文章,则写模式为'w',否则为'a'mode = 'w' if paper_index == 0 else 'a'# 根据文章标题和日期生成文件名file_name = os.path.join(export_path, date_str+'-'+self.validateTitle(paper.title)+"."+self.file_format)# 将审稿内容导出为Markdown格式并保存self.export_to_markdown("\n".join(htmls), file_name=file_name, mode=mode)# 清空htmls列表,为下一篇文章做准备htmls = []
- 主程序部分:
定义了一个chat_reviewer_main 函数,该函数创建了一个Reviewer对象,并对指定路径中的PDF文件进行审稿
主程序中定义了命令行参数解析,并调用了chat_reviewer_main 函数def chat_reviewer_main(args): reviewer1 = Reviewer(args=args)# 开始判断是路径还是文件: paper_list = [] if args.paper_path.endswith(".pdf"):paper_list.append(Paper(path=args.paper_path)) else:for root, dirs, files in os.walk(args.paper_path):print("root:", root, "dirs:", dirs, 'files:', files) #当前目录路径for filename in files:# 如果找到PDF文件,则将其复制到目标文件夹中if filename.endswith(".pdf"):paper_list.append(Paper(path=os.path.join(root, filename))) print("------------------paper_num: {}------------------".format(len(paper_list))) [print(paper_index, paper_name.path.split('\\')[-1]) for paper_index, paper_name in enumerate(paper_list)]reviewer1.review_by_chatgpt(paper_list=paper_list)
在主程序中增加了审稿时间的计算功能if __name__ == '__main__': parser = argparse.ArgumentParser()parser.add_argument("--paper_path", type=str, default='', help="path of papers")parser.add_argument("--file_format", type=str, default='txt', help="output file format")parser.add_argument("--research_fields", type=str, default='computer science, artificial intelligence and reinforcement learning', help="the research fields of paper")parser.add_argument("--language", type=str, default='en', help="output lauguage, en or zh")reviewer_args = ReviewerParams(**vars(parser.parse_args()))start_time = time.time()chat_reviewer_main(args=reviewer_args)print("review time:", time.time() - start_time)
当然,这个项目的论文审稿部分更多是用的ChatGPT的API审稿,我司在API的基础上进一步做了微调的工作,比如如何通过论文审阅语料微调出一个论文审稿GPT(甚至通过10万量级的paper+review语料微调/训练),详见本文的第三部分或我司的「大模型项目开发线下营」
1.2 PDF解析:ChatPaper/scipdf_parser-master/
通过这个项目文件:ChatPaper/scipdf_parser-master/scipdf/pdf/parse_pdf.py可以看到以下内容
1.2.1 必要的库、常量、PDF路径
- 导入必要的库
re: 正则表达式库,用于匹配和处理字符串
os 和 os.path: 操作文件和路径的库
glob: 搜索文件的库
urllib: 用于处理和获取 URL
subprocess: 执行外部命令和程序的库
requests: 用于发送 HTTP 请求的库
BeautifulSoup 和 NavigableString: 从 bs4 导入,用于解析和操作 XML/HTML 内容
tqdm 和 tqdm_notebook: 提供进度条功能 - 定义常量
GROBID_URL: GROBID 是一个开源软件,可以从 PDF 文件中提取和解析学术出版物的结构化信息
PDF_FIGURES_JAR_PATH: 这是指向某个 jar 文件的路径,但这段代码中并没有用到这个常量 - 函数 list_pdf_paths: 返回给定文件夹中所有 PDF 文件的路径
- 函数 validate_url: 通过正则表达式验证给定的路径是否为有效的 URL
def validate_url(path: str):"""验证给定的``path``是否为URL"""# 定义正则表达式以匹配URL# 下面的正则表达式主要匹配了以下几部分:# 1. http:// 或 https:// 开头# 2. 域名 (例如:example.com)# 3. localhost (本地主机)# 4. IP地址 (例如:192.168.1.1)# 5. 可选的端口号 (例如::80)# 6. 路径或者查询字符串regex = re.compile(r"^(?:http|ftp)s?://" # http:// or https:// 开头# 域名部分r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|" r"localhost|" # localhost 部分r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # IP地址部分r"(?::\d+)?" # 可选的端口号部分r"(?:/?|[/?]\S+)$", # 路径或查询字符串部分re.IGNORECASE, # 忽略大小写)# 使用上述正则表达式匹配给定的path,如果匹配成功则返回True,否则返回Falsereturn re.match(regex, path) is not None
1.2.2 parse_pdf:对PDF的解析
这是代码中的核心功能,用 GROBID 服务从 PDF 文档中解析 XML 或 BeautifulSoup 格式的信息
如果 fulltext 参数为 True,则解析整篇文章;否则,只解析标题
可以从本地或云端的 GROBID 服务中获取数据
def parse_pdf(pdf_path: str,fulltext: bool = True,soup: bool = False,return_coordinates: bool = True,grobid_url: str = GROBID_URL,
):"""使用GROBID工具将PDF解析为XML或BeautifulSoup可以查看http://grobid.readthedocs.io/en/latest/Install-Grobid/了解如何本地运行GROBID加载GROBID zip文件后,可以使用以下方法运行GROBID>> ./gradlew run参数==========pdf_path: str 或 bytes,出版物、文章的路径、URL或PDF的字节字符串fulltext: bool, 解析选项,如果为True,解析文章的全部文本如果为False,只解析头部grobid_url: str, GROBID解析器的url,默认为'http://localhost:8070'可以更改为"https://cloud.science-miner.com/grobid/"使用云服务soup: bool, 如果为True,返回文章的BeautifulSoup输出======parsed_article: 如果soup为False,则返回文本格式的解析后的XML,否则返回XML的BeautifulSoup示例=======>> parsed_article = parse_pdf(pdf_path, fulltext=True, soup=True)"""# GROBID的URLif fulltext:url = "%s/api/processFulltextDocument" % grobid_url # 完整文本处理URLelse:url = "%s/api/processHeaderDocument" % grobid_url # 仅处理头部的URLfiles = []if return_coordinates: # 如果需要返回坐标files += [("teiCoordinates", (None, "persName")),("teiCoordinates", (None, "figure")),("teiCoordinates", (None, "ref")),("teiCoordinates", (None, "formula")),("teiCoordinates", (None, "biblStruct")),]if isinstance(pdf_path, str): # 如果pdf_path是字符串if validate_url(pdf_path) and op.splitext(pdf_path)[-1].lower() != ".pdf":print("输入的URL必须以``.pdf``结尾")parsed_article = Noneelif validate_url(pdf_path) and op.splitext(pdf_path)[-1] == ".pdf":page = urllib.request.urlopen(pdf_path).read() # 从URL下载PDFparsed_article = requests.post(url, files={"input": page}).text # 通过GROBID处理下载的PDFelif op.exists(pdf_path): # 如果pdf_path是文件路径parsed_article = requests.post(url, files={"input": open(pdf_path, "rb")}).text # 通过GROBID处理文件else:parsed_article = Noneelif isinstance(pdf_path, bytes): # 如果pdf_path是字节# 假设传入的是字节字符串parsed_article = requests.post(url, files={"input": pdf_path}).text # 通过GROBID处理字节else:parsed_article = Noneif soup and parsed_article is not None: # 如果需要返回BeautifulSoup对象parsed_article = BeautifulSoup(parsed_article, "lxml")return parsed_article
1.2.3 提取作者信息/parse_authors、出版日期/parse_date、摘要/parse_abstract、段落/parse_sections
- 函数parse_authors从 BeautifulSoup 文章对象中提取作者信息
def parse_authors(article):"""Parse authors from a given BeautifulSoup of an article"""# 从文章的 BeautifulSoup 对象中查找包含作者信息的 "sourcedesc" 标签,然后找到其中所有的 "persname" 标签author_names = article.find("sourcedesc").findAll("persname")# 创建一个空列表,用于保存解析的作者名字authors = []# 遍历每个作者标签for author in author_names:# 查找作者的名字,并进行处理,如果不存在则返回空字符串firstname = author.find("forename", {"type": "first"})firstname = firstname.text.strip() if firstname is not None else ""# 查找作者的中间名,并进行处理,如果不存在则返回空字符串middlename = author.find("forename", {"type": "middle"})middlename = middlename.text.strip() if middlename is not None else ""# 查找作者的姓氏,并进行处理,如果不存在则返回空字符串lastname = author.find("surname")lastname = lastname.text.strip() if lastname is not None else ""# 判断中间名是否存在,然后将名、中间名和姓组合在一起if middlename is not "":authors.append(firstname + " " + middlename + " " + lastname)else:authors.append(firstname + " " + lastname)# 使用"; "连接所有的作者名,生成一个字符串authors = "; ".join(authors)# 返回最终的作者名字符串return authors
- 下面这个parse_date函数是提取初版日期,从 BeautifulSoup 文章对象中提取出版日期
def parse_date(article):"""Parse date from a given BeautifulSoup of an article"""# 从文章的 BeautifulSoup 对象中查找包含出版日期信息的 "publicationstmt" 标签pub_date = article.find("publicationstmt")# 在 "publicationstmt" 标签下查找 "date" 标签year = pub_date.find("date")# 尝试获取 "date" 标签的 "when" 属性,如果标签不存在则返回空字符串year = year.attrs.get("when") if year is not None else ""# 返回解析出的年份return year
- 而parse_abstract这个函数则是提取摘要,即从 BeautifulSoup 文章对象中提取摘要
def parse_abstract(article):"""Parse abstract from a given BeautifulSoup of an article"""# 从文章的 BeautifulSoup 对象中查找 "abstract" 标签div = article.find("abstract")# 初始化摘要字符串为空abstract = ""# 遍历 "abstract" 标签下的所有直接子节点for p in list(div.children):# 如果子节点不是纯文本(NavigableString)且子节点的子元素数量大于0if not isinstance(p, NavigableString) and len(list(p)) > 0:# 将子节点下的所有非纯文本子元素的文本内容加入摘要字符串abstract += " ".join([elem.text for elem in p if not isinstance(elem, NavigableString)])# 返回解析出的摘要return abstract
- 而parse_sections则是提取段落,从 BeautifulSoup 文章对象中提取文章的各个部分或段落,且它还计算每个部分中的引用数量
def parse_sections(article, as_list: bool = False):"""从给定的BeautifulSoup文章中解析章节列表参数==========as_list: bool, 如果为True,则将输出文本作为段落列表,而不是将其连接成一个单一的文本"""# 找到文章中的"text"部分article_text = article.find("text")# 获取所有带有特定属性的"div"标签divs = article_text.find_all("div", attrs={"xmlns": "http://www.tei-c.org/ns/1.0"})sections = [] # 初始化章节列表for div in divs:div_list = list(div.children)if len(div_list) == 0:heading = ""text = ""elif len(div_list) == 1:# 如果只有一个子元素if isinstance(div_list[0], NavigableString):heading = str(div_list[0])text = ""else:heading = ""text = div_list[0].textelse:text = []heading = div_list[0]if isinstance(heading, NavigableString):heading = str(heading)p_all = list(div.children)[1:]else:heading = ""p_all = list(div.children)for p in p_all:if p is not None:try:text.append(p.text) # 尝试添加文本except:passif not as_list:text = "\n".join(text)# 如果标题或文本不为空if heading is not "" or text is not "":# 计算参考文献数量ref_dict = calculate_number_of_references(div)sections.append({"heading": heading,"text": text,"n_publication_ref": ref_dict["n_publication_ref"],"n_figure_ref": ref_dict["n_figure_ref"],})return sections
1.2.4 计算引用与解析文献引用/parse_references(article)
- calculate_number_of_references:计算给定部分中的引用数量
def calculate_number_of_references(div):"""对于给定的章节,计算章节中的参考文献数量"""# 计算给定章节中的文献引用数量n_publication_ref = len(# 列表推导式查找所有type属性为"bibr"的"ref"标签[ref for ref in div.find_all("ref") if ref.attrs.get("type") == "bibr"])# 计算给定章节中的图形引用数量n_figure_ref = len(# 列表推导式查找所有type属性为"figure"的"ref"标签[ref for ref in div.find_all("ref") if ref.attrs.get("type") == "figure"])# 返回一个字典,包含文献引用数量和图形引用数量return {"n_publication_ref": n_publication_ref, "n_figure_ref": n_figure_ref}
- parse_references(article):解析文献引用
功能:从给定的BeautifulSoup对象中解析文献引用列表
主要步骤:
寻找包含引用的部分
对于每个引用,提取文章标题、期刊、发布日期和作者信息
返回包含所有引用信息的列表def parse_references(article):"""从给定的BeautifulSoup文章中解析引用列表"""reference_list = [] # 初始化引用列表# 在文章中查找文本部分中的引用部分references = article.find("text").find("div", attrs={"type": "references"})# 如果存在引用,则查找所有的"biblstruct"标签,否则返回空列表references = references.find_all("biblstruct") if references is not None else []reference_list = [] # 再次初始化引用列表for reference in references:# 尝试查找引用的文章标题title = reference.find("title", attrs={"level": "a"})if title is None:title = reference.find("title", attrs={"level": "m"})title = title.text if title is not None else ""# 尝试查找引用的期刊名journal = reference.find("title", attrs={"level": "j"})journal = journal.text if journal is not None else ""if journal is "":journal = reference.find("publisher")journal = journal.text if journal is not None else ""# 查找引用的出版年份year = reference.find("date")year = year.attrs.get("when") if year is not None else ""authors = [] # 初始化作者列表# 遍历引用中的所有作者for author in reference.find_all("author"):firstname = author.find("forename", {"type": "first"})firstname = firstname.text.strip() if firstname is not None else ""middlename = author.find("forename", {"type": "middle"})middlename = middlename.text.strip() if middlename is not None else ""lastname = author.find("surname")lastname = lastname.text.strip() if lastname is not None else ""# 根据是否有中间名来组合作者的全名if middlename is not "":authors.append(firstname + " " + middlename + " " + lastname)else:authors.append(firstname + " " + lastname)authors = "; ".join(authors) # 将所有作者连接为一个字符串# 将标题、期刊、年份和作者添加到引用列表中reference_list.append({"title": title, "journal": journal, "year": year, "authors": authors})return reference_list # 返回引用列表
1.2.5 解析图形和表格、公式
- parse_figure_caption(article)
功能:从给定的BeautifulSoup对象中解析图形和表格
主要步骤:
搜索所有图形
对于每个图形或表格,提取标签、类型、ID、标题和数据
返回包含所有图形/表格信息的列表def parse_figure_caption(article):"""从给定的BeautifulSoup文章中解析图表列表"""figures_list = [] # 初始化图表列表# 在文章中查找所有的"figure"标签figures = article.find_all("figure")for figure in figures:# 获取图标的类型(可能是图或表)和IDfigure_type = figure.attrs.get("type") or ""figure_id = figure.attrs.get("xml:id") or ""# 获取图标的标签(如"图1")label = figure.find("label").textif figure_type == "table":# 如果图形类型为表,则获取表的标题和数据caption = figure.find("figdesc").textdata = figure.table.textelse:# 否则,只获取图形的标题,并将数据设置为空字符串caption = figure.textdata = ""# 将标签、类型、ID、标题和数据添加到图形列表中figures_list.append({"figure_label": label,"figure_type": figure_type,"figure_id": figure_id,"figure_caption": caption,"figure_data": data,})return figures_list # 返回图表列表
- parse_figures(...):
功能:使用pdffigures2工具从给定的科学PDF中解析图形
主要步骤:
检查输出文件夹是否存在,如果不存在则创建它
在输出文件夹中创建子文件夹来保存数据和图形
使用Java运行pdffigures2工具解析图形
打印完成消息def parse_figures(pdf_folder: str,jar_path: str = PDF_FIGURES_JAR_PATH,resolution: int = 300,output_folder: str = "figures", ):"""使用pdffigures2从给定的科学PDF中提取图形。参数==========pdf_folder: str, 包含PDF文件的文件夹的路径。一个文件夹必须只包含PDF文件。jar_path: str, pdffigures2-assembly-0.0.12-SNAPSHOT.jar文件的默认路径。resolution: int, 输出图形的分辨率。output_folder: str, 我们希望保存解析数据(与图形相关)和图形的文件夹的路径。输出======folder: 在output_folder/data和output_folder/figures中创建文件夹,分别包含解析数据和图形。"""# 检查output_folder是否存在,如果不存在,则创建它。if not op.isdir(output_folder):os.makedirs(output_folder)# 在output_folder内创建“data”和“figures”子文件夹。data_path = op.join(output_folder, "data")figure_path = op.join(output_folder, "figures")if not op.exists(data_path):os.makedirs(data_path)if not op.exists(figure_path):os.makedirs(figure_path)# 如果data和figures文件夹存在,则执行pdffigures2命令。if op.isdir(data_path) and op.isdir(figure_path):args = ["java","-jar",jar_path,pdf_folder,"-i",str(resolution),"-d",op.join(op.abspath(data_path), ""),"-m",op.join(op.abspath(figure_path), ""), # end path with "/"]_ = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20)print("完成从PDFs中提取图形!")else:print("您可能需要检查output文件夹路径中的``data``和``figures``。")
- parse_formulas(article):解析公式
功能:从给定的BeautifulSoup对象中解析公式
主要步骤:
搜索所有公式
提取公式的ID、文本和坐标
返回包含所有公式信息的列表def parse_formulas(article):"""从给定的BeautifulSoup文章中解析公式列表"""formulas_list = [] # 初始化公式列表# 在文章中查找所有的"formula"标签formulas = article.find_all("formula")for formula in formulas:# 获取公式的IDformula_id = formula.attrs["xml:id"] or ""# 获取公式的文本内容formula_text = formula.text# 尝试获取公式的坐标formula_coordinates = formula.attrs.get("coords") or ""if formula_coordinates is not "":# 如果有坐标,将它们转换为浮点数列表formula_coordinates = [float(x) for x in formula_coordinates.split(",")]# 将ID、文本和坐标添加到公式列表中formulas_list.append({"formula_id": formula_id,"formula_text": formula_text,"formula_coordinates": formula_coordinates,})return formulas_list # 返回公式列表
1.2.6 把标题/作者/摘要/图形/公式等转换为JSON格式的字典
- convert_article_soup_to_dict(article, as_list=False):
功能:将BeautifulSoup对象转换为JSON格式的字典,类似于某些开源项目的输出
主要步骤:
提取文章的标题、作者、发布日期、摘要、部分、引用、图形和公式
返回一个包含所有这些信息的字典def convert_article_soup_to_dict(article, as_list: bool = False):"""将BeautifulSoup对象转换为JSON格式的函数与https://github.com/allenai/science-parse/ 的输出类似参数==========article: BeautifulSoup输出======article_json: dict, 给定文章的解析字典,格式如下:{'title': ...,'abstract': ...,'sections': [{'heading': ..., 'text': ...},{'heading': ..., 'text': ...},...],'references': [{'title': ..., 'journal': ..., 'year': ..., 'authors': ...},{'title': ..., 'journal': ..., 'year': ..., 'authors': ...},...],'figures': [{'figure_label': ..., 'figure_type': ..., 'figure_id': ..., 'figure_caption': ..., 'figure_data': ...},...]}"""article_dict = {} # 初始化文章字典if article is not None:# 从文章中获取主标题title = article.find("title", attrs={"type": "main"})title = title.text.strip() if title is not None else ""article_dict["title"] = title# 解析文章的作者article_dict["authors"] = parse_authors(article)# 解析文章的发布日期article_dict["pub_date"] = parse_date(article)# 解析文章的摘要article_dict["abstract"] = parse_abstract(article)# 解析文章的各个部分article_dict["sections"] = parse_sections(article, as_list=as_list)# 解析文章的参考文献article_dict["references"] = parse_references(article)# 解析文章的图表article_dict["figures"] = parse_figure_caption(article)# 解析文章的公式article_dict["formulas"] = parse_formulas(article)# 从文章中获取DOIdoi = article.find("idno", attrs={"type": "DOI"})doi = doi.text if doi is not None else ""article_dict["doi"] = doireturn article_dictelse:return None # 如果文章不存在,返回None
- parse_pdf_to_dict(...)
功能:解析给定的PDF并返回解析后的文章的字典
主要步骤:
使用外部工具或服务(如GROBID)解析PDF
将解析后的BeautifulSoup对象转换为字典格式
返回该字典
这个函数的目的是解析给定的PDF文件,并将其转换为一个结构化的字典。首先,它使用parse_pdf函数来解析PDF,然后使用convert_article_soup_to_dict函数将解析后的BeautifulSoup对象转换为字典def parse_pdf_to_dict(pdf_path: str,fulltext: bool = True,soup: bool = True,as_list: bool = False,return_coordinates: bool = True,grobid_url: str = GROBID_URL, ):"""解析给定的PDF并返回解析后的文章字典参数==========pdf_path: str, 出版物或文章的路径fulltext: bool, 是否提取完整文本soup: bool, 是否返回BeautifulSoupas_list: bool, 是否返回部分列表return_coordinates: bool, 是否返回坐标grobid_url: str, grobid服务器的url,默认为`GROBID_URL`可更改为 "https://cloud.science-miner.com/grobid/" 使用云服务输出=====article_dict: dict, 文章的字典"""# 使用parse_pdf函数解析PDFparsed_article = parse_pdf(pdf_path,fulltext=fulltext,soup=soup,return_coordinates=return_coordinates,grobid_url=grobid_url,)# 将BeautifulSoup对象转换为字典article_dict = convert_article_soup_to_dict(parsed_article, as_list=as_list)return article_dict # 返回解析后的文章字典
1.3 论文检索:ChatPaper/auto_survey/utils
具体包含如下功能(这个基于GPT4的文献总结工具的项目auto-draft也提供类似的功能)
- 自动搜索相关文献, 提供真实有出处的引用
- 自动生成LaTeX格式,markdown格式的调研结果
1.3.1 /utils/knowledge_databases/ml_textbook_test
// 待更
1.3.2 /utils/embeddings.py
# 导入HuggingFace的文本嵌入功能
from langchain.embeddings import HuggingFaceEmbeddings
# 导入操作系统相关的模块,用于获取环境变量等操作
import os# 从环境变量中获取OpenAI的API密钥
openai_api_key = os.getenv("OPENAI_API_KEY")
# 如果获取到了OpenAI的API密钥
if openai_api_key is not None:# 导入OpenAI的文本嵌入功能from langchain.embeddings.openai import OpenAIEmbeddings# 使用获取到的API密钥初始化OpenAI的文本嵌入openai_embedding = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=openai_api_key)
else:# 如果没有获取到API密钥,则将OpenAI的文本嵌入设为Noneopenai_embedding = None# 定义HuggingFace的模型名称
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
# 设置模型的参数,这里是将模型放在CPU上运行
model_kwargs = {'device': 'cpu'}
# 设置文本嵌入的参数,这里是不对嵌入进行归一化
encode_kwargs = {'normalize_embeddings': False}# 使用上述参数初始化HuggingFace的文本嵌入
all_minilm_l6_v2 = HuggingFaceEmbeddings(model_name=model_name,model_kwargs=model_kwargs,encode_kwargs=encode_kwargs)# 创建一个字典来存储上述两种文本嵌入,方便后续调用
EMBEDDINGS = {"text-embedding-ada-002": openai_embedding, "all-MiniLM-L6-v2": all_minilm_l6_v2}
1.3.3 /utils/gpt_interaction.py
// 待更
1.3.4 /utils/knowledge.py
定义了一个Knowledge类,该类使用关键词字典从数据库中搜索相关内容,并可以将这些内容转化为提示文本或JSON格式
import tiktoken # 导入tiktoken模块,用于计算tokens数量
from random import shuffle # 从random模块导入shuffle函数,用于随机打乱列表# 使用`tiktoken`来计算文本中的tokens数量
tokenizer_name = tiktoken.encoding_for_model('gpt-4') # 为"gpt-4"模型获取相应的编码器名称
tokenizer = tiktoken.get_encoding(tokenizer_name.name) # 获取编码器实例def tiktoken_len(text):# 计算给定文本中的tokens数量tokens = tokenizer.encode(text, disallowed_special=()) # 对文本进行编码并返回tokensreturn len(tokens) # 返回tokens的数量class Knowledge:# 定义一个Knowledge类来处理知识数据库相关操作def __init__(self, db):self.db = db # 数据库实例self.contents = [] # 用于存放内容的列表def collect_knowledge(self, keywords_dict, max_query):"""根据给定的关键词字典,从数据库中搜索并收集相关的知识。keywords_dict:示例: {"machine learning": 5, "language model": 2};"""db = self.dbif max_query > 0:for kw in keywords_dict:docs = db.similarity_search_with_score(kw, k=max_query) # 使用关键词在数据库中进行相似度搜索for i in range(max_query):content = {"content": docs[i][0].page_content.replace('\n', ' '), # 移除换行符"score": docs[i][1]} # 为每个文档添加评分self.contents.append(content) # 将内容添加到contents列表中shuffle(self.contents) # 随机打乱contents列表def to_prompts(self, max_tokens=2048):# 将收集到的知识内容转化为提示文本,且tokens总数不超过max_tokensif len(self.contents) == 0:return ""prompts = []tokens = 0for idx, content in enumerate(self.contents):prompt = "Reference {}: {}\n".format(idx, content["content"])tokens += tiktoken_len(prompt)if tokens >= max_tokens:breakelse:prompts.append(prompt) # 将提示文本添加到prompts列表中return "".join(prompts) # 返回连接后的提示文本def to_json(self):# 将收集到的知识内容转化为JSON格式if len(self.contents) == 0:return {}output = {}for idx, content in enumerate(self.contents):output[str(idx)] = {"content": content["content"],"score": str(content["score"])}print(output)return output
1.3.5 /utils/references.py
这个代码文件主要注意实现了以下功能
1.3.5.1 第一部分:References
类之外
-
Reference类的说明
- 从给定的
.bib
文件中读取论文,并用search_paper_abstract
方法填充缺失的摘要 - 根据一些关键词使用Semantic Scholar API查找相关论文
- 从所选论文中生成Bibtex引用格式
- 从所选论文中生成提示(prompts)。示例提示格式为:
{"paper_id": "paper summary"}
。
- 从给定的
-
待完成的任务(todo)
- 加载预定义的论文;
- 使用Semantic Scholar API查找所有相关作品;
- 将所有引文添加到
bib_papers
; - 将所有被引文添加到
bib_papers
; - 使用Semantic Scholar查找它们的嵌入;
- 将引文分组以减少tokens的数量
-
一些基本的工具
- evaluate_cosine_similarity:计算两个向量的余弦相似性
def evaluate_cosine_similarity(v1, v2):try:return np.dot(v1, v2)/(norm(v1)*norm(v2))except ValueError:return 0.0
- chunks 将一个较长的列表分割为较小的批次,以便于处理;
def chunks(lst, chunk_size=MAX_BATCH_SIZE):"""Splits a longer list to respect batch size"""for i in range(0, len(lst), chunk_size):yield lst[i : i + chunk_size]
- embed 通过向Semantic Scholar的API发送请求,为一组论文计算嵌入(即将论文映射到一个向量空间中)
def embed(papers):embeddings_by_paper_id: Dict[str, List[float]] = {}for chunk in chunks(papers):# Allow Python requests to convert the data above to JSONresponse = requests.post(URL, json=chunk)if response.status_code != 200:raise RuntimeError("Sorry, something went wrong, please try later!")for paper in response.json()["preds"]:embeddings_by_paper_id[paper["paper_id"]] = paper["embedding"]return embeddings_by_paper_id
- get_embeddings 为给定的论文标题和描述获取嵌入
def get_embeddings(paper_title, paper_description):output = [{"title": paper_title, "abstract": paper_description, "paper_id": "target_paper"}]emb_vector = embed(output)["target_paper"]target_paper = output[0]target_paper["embeddings"] = emb_vectorreturn target_paper
- get_top_k 获取与给定论文最相关的
k
篇论文
具体而言,从提供的papers_dict 中找到与给定的paper_title和paper_description最相似的前k篇论文,并返回。至于相似性是通过计算两篇论文嵌入向量的余弦相似度来确定的def get_top_k(papers_dict, paper_title, paper_description, k=None):# 获取目标论文的嵌入向量target_paper = get_embeddings(paper_title, paper_description)# 存放所有的论文信息,其中应包含嵌入向量papers = papers_dict # 如果k小于papers的数量,返回k篇最相关的论文# 如果k大于等于papers的数量或k为None,返回所有论文max_num_papers = len(papers) # 获取论文总数if k is None: # 如果k为None,设置k为论文总数k = max_num_papersnum_papers = min(k, max_num_papers) # 确定需要返回的论文数量# 获取目标论文的嵌入向量target_embedding_vector = target_paper["embeddings"]# 计算每篇论文与目标论文的余弦相似度for k in papers:v = papers[k]embedding_vector = v["embeddings"] # 获取当前论文的嵌入向量cos_sim = evaluate_cosine_similarity(embedding_vector, target_embedding_vector) # 计算余弦相似度papers[k]["cos_sim"] = cos_sim # 存储余弦相似度到papers中# 返回相似度最高的前k篇论文sorted_papers = {k: v for k, v in sorted(papers.items(), key=lambda x: x[1]["cos_sim"], reverse=True)[:num_papers]}# 从返回的论文中移除嵌入向量信息for key in sorted_papers:sorted_papers[key].pop("embeddings", None)return sorted_papers
- remove_newlines 去除摘要中的换行符,减少提示的长度
def remove_newlines(serie):# This function is applied to the abstract of each paper to reduce the length of prompts.serie = serie.replace('\n', ' ')serie = serie.replace('\\n', ' ')serie = serie.replace(' ', ' ')serie = serie.replace(' ', ' ')return serie
- evaluate_cosine_similarity:计算两个向量的余弦相似性
-
从.bib文件加载论文信息
- 读取.bib文件,并将其解析为一个python对象;
- 通过load_papers_from_bibtex 函数遍历这个对象,从中提取论文的各种属性(如ID、标题、期刊、年份、作者、摘要等);
def load_papers_from_bibtex(bib_file_path):with open(bib_file_path) as bibtex_file:bib_database = bibtexparser.load(bibtex_file)if len(bib_database.entries) == 0:return []else:bib_papers = []for bibitem in bib_database.entries:# Add each paper to `bib_papers`paper_id = bibitem.get("ID")title = bibitem.get("title")if title is None:continuejournal = bibitem.get("journal")year = bibitem.get("year")author = bibitem.get("author")abstract = bibitem.get("abstract")if abstract is None:abstract = search_paper_abstract(title)result = {"paper_id": paper_id,"title": title,"link": "","abstract": abstract,"authors": author,"year": year,"journal": journal}bib_papers.append(result)return bib_papers
- 对于缺失摘要的论文,使用
search_paper_abstract
函数查询摘要def search_paper_abstract(title):pg = ProxyGenerator()success = pg.FreeProxies() # pg.ScraperAPI("921b16f94d701308b9d9b4456ddde155")if success:try:scholarly.use_proxy(pg)# input the title of a paper, return its abstractsearch_query = scholarly.search_pubs(title)found_paper = next(search_query)except:return ""else:return ""# raise RuntimeError("ScraperAPI fails.")return remove_newlines(found_paper['bib']['abstract'])
-
计算文本的tokens数量
- 使用
tokenizer
对象来计算给定文本的tokens的数量# `tokenizer`: used to count how many tokens tokenizer_name = tiktoken.encoding_for_model('gpt-4') tokenizer = tiktoken.get_encoding(tokenizer_name.name)def tiktoken_len(text):# evaluate how many tokens for the given texttokens = tokenizer.encode(text, disallowed_special=())return len(tokens)
- 使用
-
使用Semantic Scholar (SS) API搜索论文
- 使用Semantic Scholar API搜索指定关键词的论文;
- 从API返回的数据中提取论文的各种属性
-
parse_search_results
函数这部分主要关于从搜索结果中提取学术论文的相关信息:
该函数的目的是对传入的搜索结果进行解析,并将其转换为一个论文信息列表。- 首先检查传入的搜索结果是否为空。
- 逐个解析每篇论文的内容,包括作者信息、年份、标题等。
- 对某些字段进行特殊处理,如将日志名中的
&
替换为\&
。 - 如果存在摘要的“tldr”(即“过长不读”)版本,它会被优先使用,否则会使用原始摘要。
- 最后,所有提取出的信息将被组合成一个字典并添加到结果列表中
且函数下方的代码调用了一个假设的ss_search
方法,然后使用上述函数处理这些搜索结果def parse_search_results(search_results_ss):# 判断搜索结果是否为空if len(search_results_ss) == 0:return []# 将搜索结果转换为论文字典的列表papers_ss = []for raw_paper in search_results_ss:# 如果论文没有摘要,跳过此论文if raw_paper["abstract"] is None:continue# 提取作者信息authors_str, last_name = extract_author_info(raw_paper['authors'])# 获取论文的发表年份year_str = str(raw_paper['year'])# 获取论文标题title = raw_paper['title']# 有些期刊的名字可能包含"&"字符;将其替换掉journal = raw_paper['venue'].replace("&", "\\&")# 如果没有提供期刊名,就默认为“arXiv preprint”if not journal:journal = "arXiv preprint"# 根据作者姓、发表年份和标题提取论文IDpaper_id = extract_paper_id(last_name, year_str, title).lower()# 转换外部ID为链接link = externalIds2link(raw_paper['externalIds'])# 如果存在tldr摘要,使用tldr摘要;否则,使用原始摘要并移除其中的换行符if tldr and raw_paper['tldr'] is not None:abstract = raw_paper['tldr']['text']else:abstract = remove_newlines(raw_paper['abstract'])# 有些论文可能没有嵌入;处理这种情况embeddings_dict = raw_paper.get('embedding')if embeddings_dict is None:continueelse:embeddings = raw_paper['embedding']['vector']# 组合结果result = {"paper_id": paper_id,"title": title,"abstract": abstract,"link": link,"authors": authors_str,"year": year_str,"journal": journal,"embeddings": embeddings}# 将结果添加到论文列表中papers_ss.append(result)# 返回论文列表return papers_ss# 使用关键字进行搜索 raw_results = ss_search(keyword, limit=counts) # 如果获取到了原始搜索结果 if raw_results is not None:# 提取搜索结果数据search_results = raw_results.get("data")# 如果搜索结果是空的,设置为空列表if search_results is None:search_results = [] # 如果没有获取到原始搜索结果,设置为空列表 else:search_results = [] # 解析搜索结果并返回 results = parse_search_results(search_results) return results
1.3.5.2 第二部分:References
类
该类用于管理论文引用:
- 初始化方法:当创建一个References对象时,可以选择为其提供标题、论文列表、关键词以及描述
- load_papers 方法:加载给定BibTeX格式的论文到引用类中
- generate_keywords_dict 方法:生成一个关键词字典,其中每个关键词都关联一个论文数量
- collect_papers 方法:使用给定的关键词字典收集尽可能多的论文。这个方法尝试收集给定关键词的相关论文,并添加到类的内部存储中
- to_bibtex 方法:将保存的论文列表转换为BibTeX格式的文件
- _get_papers 方法:一个内部方法,用于从内部存储中获取论文列表
- to_prompts 方法:将引用转换为提示格式,这可能是为了后续使用某种机器学习模型
- to_json 方法:将论文列表转换为JSON格式
- 代码的最后部分(在if __name__ == "__main__":之后)是一个简单的测试部分,用于测试上述代码的功能
//待更
1.4 ChatPaper/chat_paper.py
chat_paper.py,包含一个Paper类、Reader类和chat_paper_mian函数。该程序功能为根据读者输入的搜索查询和感兴趣的关键词,从Arxiv数据库中获取文章,并对文章进行摘要和总结。程序使用了OpenAI的GPT-3模型生成文本摘要,使用了arxiv包获取Arxiv数据库中的文章。程序会将摘要和总结以markdown文件的形式保存下来。
1.4.1 Paper类
- Paper 类代表了一篇论文,它可以从 PDF 文件中解析出论文的元信息和内容,并提供了一些函数用于获取论文信息,如获取文章标题,获取章节名称及内容等。主要方法有:
- parse_pdf:解析PDF文件
其中的self._get_all_page_index() 和self._get_all_page() 这两个方法 下文很快会定义def parse_pdf(self): # 定义一个方法来解析PDF文件self.pdf = fitz.open(self.path) # 使用fitz库打开指定路径的pdf文件self.text_list = [page.get_text() for page in self.pdf] # 从每一页中提取文本并存放到列表中self.all_text = ' '.join(self.text_list) # 将每一页的文本连接成一个完整的字符串self.section_page_dict = self._get_all_page_index() # 获取段落与其对应的页码字典print("section_page_dict", self.section_page_dict) # 打印该段落与页码的对应字典self.section_text_dict = self._get_all_page() # 获取段落与其对应的内容字典self.section_text_dict.update({"title": self.title}) # 将标题添加到段落内容字典中self.section_text_dict.update({"paper_info": self.get_paper_info()}) # 获取论文的信息并添加到字典中self.pdf.close() # 关闭pdf文件
- get_all_page_index:各个部分与页码的对应字典
def _get_all_page_index(self):# 定义需要寻找的章节名称列表section_list = ["Abstract", 'Introduction', 'Related Work', 'Background', "Preliminary", "Problem Formulation",'Methods', 'Methodology', "Method", 'Approach', 'Approaches',# exp"Materials and Methods", "Experiment Settings",'Experiment', "Experimental Results", "Evaluation", "Experiments", "Results", 'Findings', 'Data Analysis', "Discussion", "Results and Discussion", "Conclusion",'References']# 初始化一个字典来存储找到的章节和它们在文档中出现的页码section_page_dict = {}# 遍历每一页文档for page_index, page in enumerate(self.pdf):# 获取当前页面的文本内容cur_text = page.get_text()# 遍历需要寻找的章节名称列表for section_name in section_list:# 将章节名称转换成大写形式section_name_upper = section_name.upper()# 如果当前页面包含"Abstract"这个关键词if "Abstract" == section_name and section_name in cur_text:# 将"Abstract"和它所在的页码加入字典中section_page_dict[section_name] = page_index# 如果当前页面包含章节名称,则将章节名称和它所在的页码加入字典中else:if section_name + '\n' in cur_text:section_page_dict[section_name] = page_indexelif section_name_upper + '\n' in cur_text:section_page_dict[section_name] = page_index# 返回所有找到的章节名称及它们在文档中出现的页码return section_page_dict
- get_all_page:各个部分与内容对应的字典
def _get_all_page(self):"""获取PDF文件中每个页面的文本信息,并将文本信息按照章节组织成字典返回。"""text = '' # 初始化空字符串用于临时储存文本text_list = [] # 初始化列表用于储存每一页的文本section_dict = {} # 初始化章节字典text_list = [page.get_text() for page in self.pdf] # 从每一页获取文本for sec_index, sec_name in enumerate(self.section_page_dict): # 遍历章节页码字典print(sec_index, sec_name, self.section_page_dict[sec_name]) # 打印章节索引、章节名和章节起始页码if sec_index <= 0 and self.abs: # 如果是第一个章节并且存在摘要,则跳过continueelse:start_page = self.section_page_dict[sec_name] # 获取章节的起始页码# 如果当前章节不是最后一个,则获取下一个章节的起始页码作为当前章节的结束页码if sec_index < len(list(self.section_page_dict.keys()))-1:end_page = self.section_page_dict[list(self.section_page_dict.keys())[sec_index+1]]else: # 否则当前章节的结束页码为PDF的最后一页end_page = len(text_list)print("start_page, end_page:", start_page, end_page) # 打印起始和结束页码cur_sec_text = '' # 初始化当前章节的文本# 如果起始页码和结束页码相同,说明章节在同一页内if end_page - start_page == 0:next_sec = list(self.section_page_dict.keys())[sec_index+1]# 下面的代码是为了确定当前章节的文本的起始和结束位置# 这部分代码处理可能存在的大小写不一致的问题start_i = text_list[start_page].find(sec_name) if text_list[start_page].find(sec_name) != -1 else text_list[start_page].find(sec_name.upper())end_i = text_list[start_page].find(next_sec) if text_list[start_page].find(next_sec) != -1 else text_list[start_page].find(next_sec.upper())cur_sec_text += text_list[start_page][start_i:end_i]else: # 否则,章节可能跨越多页for page_i in range(start_page, end_page):# 下面的代码是为了确定在每一页中章节文本的起始和结束位置if page_i == start_page:start_i = text_list[start_page].find(sec_name) if text_list[start_page].find(sec_name) != -1 else text_list[start_page].find(sec_name.upper())cur_sec_text += text_list[page_i][start_i:]elif page_i < end_page:cur_sec_text += text_list[page_i]elif page_i == end_page:next_sec = list(self.section_page_dict.keys())[sec_index+1]end_i = text_list[start_page].find(next_sec) if text_list[start_page].find(next_sec) != -1 else text_list[start_page].find(next_sec.upper())cur_sec_text += text_list[page_i][:end_i]# 在当前章节的文本中去除多余的换行符section_dict[sec_name] = cur_sec_text.replace('-\n', '').replace('\n', ' ')return section_dict # 返回章节字典
- get_paper_info:获取论文的摘要信息
首先尝试从self.section_text_dict 字典中获取摘要,如果没有,则使用self.abs。最后,它从标题页的文本中移除摘要的内容并返回def get_paper_info(self): # 定义一个方法获取论文的信息first_page_text = self.pdf[self.title_page].get_text() # 从PDF的标题页中提取文本if "Abstract" in self.section_text_dict.keys(): # 如果"Abstract"(摘要)在字典的关键字中abstract_text = self.section_text_dict['Abstract'] # 从字典中获取摘要的文本else: # 否则abstract_text = self.abs # 使用self.abs作为摘要的文本first_page_text = first_page_text.replace(abstract_text, "") # 从首页面文本中移除摘要内容return first_page_text # 返回处理后的首页面文本
- get_chapter_names:根据字体大小,识别每个章节名称,并返回一个列表
- get_title:获取论文标题
def get_title(self):doc = self.pdf # 打开pdf文件max_font_size = 0 # 初始化最大字体大小为0max_string = "" # 初始化最大字体大小对应的字符串为空max_font_sizes = [0]for page_index, page in enumerate(doc): # 遍历每一页text = page.get_text("dict") # 获取页面上的文本信息blocks = text["blocks"] # 获取文本块列表for block in blocks: # 遍历每个文本块if block["type"] == 0 and len(block['lines']): # 如果是文字类型if len(block["lines"][0]["spans"]):font_size = block["lines"][0]["spans"][0]["size"] # 获取第一行第一段文字的字体大小 max_font_sizes.append(font_size)if font_size > max_font_size: # 如果字体大小大于当前最大值max_font_size = font_size # 更新最大值max_string = block["lines"][0]["spans"][0]["text"] # 更新最大值对应的字符串max_font_sizes.sort() print("max_font_sizes", max_font_sizes[-10:])cur_title = ''for page_index, page in enumerate(doc): # 遍历每一页text = page.get_text("dict") # 获取页面上的文本信息blocks = text["blocks"] # 获取文本块列表for block in blocks: # 遍历每个文本块if block["type"] == 0 and len(block['lines']): # 如果是文字类型if len(block["lines"][0]["spans"]):cur_string = block["lines"][0]["spans"][0]["text"] # 更新最大值对应的字符串font_flags = block["lines"][0]["spans"][0]["flags"] # 获取第一行第一段文字的字体特征font_size = block["lines"][0]["spans"][0]["size"] # 获取第一行第一段文字的字体大小 # print(font_size)if abs(font_size - max_font_sizes[-1]) < 0.3 or abs(font_size - max_font_sizes[-2]) < 0.3: # print("The string is bold.", max_string, "font_size:", font_size, "font_flags:", font_flags) if len(cur_string) > 4 and "arXiv" not in cur_string: # print("The string is bold.", max_string, "font_size:", font_size, "font_flags:", font_flags) if cur_title == '' :cur_title += cur_string else:cur_title += ' ' + cur_string self.title_page = page_index# breaktitle = cur_title.replace('\n', ' ') return title
1.4.2 Reader类
Reader类包含了下载文章、筛选文章以及使用OpenAI的GPT-3模型生成文本摘要和总结的方法。主要方法有:
- get_arxiv(): 使用Arxiv的API获取搜索结果
- filter_arxiv(): 筛选文章,并返回筛选后的结果
- download_pdf(): 从Arxiv下载筛选后的文章
- summary_with_chat(): 对每一篇下载下来的文章进行文本摘要和总结,并将结果以markdown文件的形式保存
该函数的实现主要分为三个部分
首先,第一步:用title,abs和introduction进行总结
其次,第二步:总结方法# 遍历论文列表for paper_index, paper in enumerate(paper_list):# 第一步:用title,abs和introduction进行总结text = ''text += 'Title:' + paper.titletext += 'Url:' + paper.urltext += 'Abstract:' + paper.abstext += 'Paper_info:' + paper.section_text_dict['paper_info']# 添加introductiontext += list(paper.section_text_dict.values())[0]chat_summary_text = ""# 尝试与聊天机器人对话以获取摘要try:chat_summary_text = self.chat_summary(text=text)except Exception as e: # 捕获所有异常print("summary_error:", e)import sysexc_type, exc_obj, exc_tb = sys.exc_info() # 获取异常信息fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]print(exc_type, fname, exc_tb.tb_lineno)if "maximum context" in str(e): # 如果错误信息中包含特定字符串current_tokens_index = str(e).find("your messages resulted in") + len("your messages resulted in") + 1offset = int(str(e)[current_tokens_index:current_tokens_index + 4])summary_prompt_token = offset + 1000 + 150chat_summary_text = self.chat_summary(text=text, summary_prompt_token=summary_prompt_token)# 添加到html列表中htmls.append('## Paper:' + str(paper_index + 1))htmls.append('\n\n\n')htmls.append(chat_summary_text)
最后,第三步:总结全文并打分# 第二步:总结方法。# 由于有些文章的方法章节名是算法名,所以简单的通过关键词来筛选很难获取method_key = ''for parse_key in paper.section_text_dict.keys():if 'method' in parse_key.lower() or 'approach' in parse_key.lower():method_key = parse_keybreak# 如果找到方法关键词if method_key != '':text = ''method_text = ''summary_text = ''summary_text += "<summary>" + chat_summary_textmethod_text += paper.section_text_dict[method_key]text = summary_text + "\n\n<Methods>:\n\n" + method_textchat_method_text = ""try:chat_method_text = self.chat_method(text=text)except Exception as e:print("method_error:", e)import sysexc_type, exc_obj, exc_tb = sys.exc_info()fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]print(exc_type, fname, exc_tb.tb_lineno)if "maximum context" in str(e):current_tokens_index = str(e).find("your messages resulted in") + len("your messages resulted in") + 1offset = int(str(e)[current_tokens_index:current_tokens_index + 4])method_prompt_token = offset + 800 + 150chat_method_text = self.chat_method(text=text, method_prompt_token=method_prompt_token)htmls.append(chat_method_text)else:chat_method_text = ''htmls.append("\n" * 4)
# 第三步:总结全文并打分。conclusion_key = ''for parse_key in paper.section_text_dict.keys():if 'conclu' in parse_key.lower():conclusion_key = parse_keybreaktext = ''conclusion_text = ''summary_text = ''summary_text += "<summary>" + chat_summary_text + "\n <Method summary>:\n" + chat_method_textif conclusion_key != '':conclusion_text += paper.section_text_dict[conclusion_key]text = summary_text + "\n\n<Conclusion>:\n\n" + conclusion_textelse:text = summary_textchat_conclusion_text = ""try:chat_conclusion_text = self.chat_conclusion(text=text)except Exception as e:print("conclusion_error:", e)import sysexc_type, exc_obj, exc_tb = sys.exc_info()fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]print(exc_type, fname, exc_tb.tb_lineno)if "maximum context" in str(e):current_tokens_index = str(e).find("your messages resulted in") + len("your messages resulted in") + 1offset = int(str(e)[current_tokens_index:current_tokens_index + 4])conclusion_prompt_token = offset + 800 + 150chat_conclusion_text = self.chat_conclusion(text=text, conclusion_prompt_token=conclusion_prompt_token)htmls.append(chat_conclusion_text)htmls.append("\n" * 4)# 整合成一个文件并保存date_str = str(datetime.datetime.now())[:13].replace(' ', '-')export_path = os.path.join(self.root_path, 'export')if not os.path.exists(export_path):os.makedirs(export_path)mode = 'w' if paper_index == 0 else 'a'file_name = os.path.join(export_path,date_str + '-' + self.validateTitle(paper.title[:80]) + "." + self.file_format)self.export_to_markdown("\n".join(htmls), file_name=file_name, mode=mode)htmls = []
- chat_summary():第一次提取title,abs,和introduction,设定prompt通过调用API的方式得到对应的总结
def chat_summary(self, text, summary_prompt_token=1100):# 设置OpenAI API密钥openai.api_key = self.chat_api_list[self.cur_api]# 更新API密钥索引,用于循环使用多个API密钥(如果有)self.cur_api += 1self.cur_api = 0 if self.cur_api >= len(self.chat_api_list) - 1 else self.cur_api# 计算输入文本的token数量text_token = len(self.encoding.encode(text))# 计算截断文本的索引,确保总的token数量不超过限制clip_text_index = int(len(text) * (self.max_token_num - summary_prompt_token) / text_token)# 获取截断后的文本clip_text = text[:clip_text_index]# 定义聊天机器人的交互消息messages = [{"role": "system","content": "You are a researcher in the field of [" + self.key_word + "] who is good at summarizing papers using concise statements"},{"role": "assistant","content": "This is the title, author, link, abstract and introduction of an English document. I need your help to read and summarize the following questions: " + clip_text},{"role": "user", "content": """...(这部分是详细的指示内容,为了简洁我略过了)...""".format(self.language, self.language, self.language)},]# 根据API类型调用相应的方法if openai.api_type == 'azure':response = openai.ChatCompletion.create(engine=self.chatgpt_model,messages=messages,)else:response = openai.ChatCompletion.create(model=self.chatgpt_model,messages=messages,)# 从响应中提取机器人的回复result = ''for choice in response.choices:result += choice.message.content# 打印结果和使用的token数量以及响应时间print("summary_result:\n", result)print("prompt_token_used:", response.usage.prompt_tokens,"completion_token_used:", response.usage.completion_tokens,"total_token_used:", response.usage.total_tokens)print("response_time:", response.response_ms / 1000.0, 's')# 返回结果return result
- chat_method():提取上面chat_summary()得到的结果,加上method或approach部分的内容,设定prompt通过调用API的方式得到对应的总结
def chat_method(self, text, method_prompt_token=800):# 设置OpenAI的API keyopenai.api_key = self.chat_api_list[self.cur_api]# 将当前API索引递增,以便下次使用不同的API keyself.cur_api += 1# 如果当前API索引超出API key列表的长度,则将其重置为0(实现循环使用API key列表)self.cur_api = 0 if self.cur_api >= len(self.chat_api_list) - 1 else self.cur_api# 使用encoding方法计算输入文本的token数量text_token = len(self.encoding.encode(text))# 根据最大token数量和方法提示token计算需要裁剪的文本长度clip_text_index = int(len(text) * (self.max_token_num - method_prompt_token) / text_token)# 根据上面计算的索引裁剪文本clip_text = text[:clip_text_index]# 定义要发送到ChatGPT的消息列表messages = [# 定义系统角色的消息,描述用户的专业背景和能力{"role": "system", "content": "You are a researcher in the field of [" + self.key_word + "] who is good at summarizing papers using concise statements"},# 定义助手角色的消息,描述要助手完成的任务{"role": "assistant", "content": "This is the <summary> and <Method> part of an English document, where <summary> you have summarized, but the <Methods> part, I need your help to read and summarize the following questions." + clip_text},# 定义用户角色的消息,给出具体的问题和期望格式{"role": "user", "content": """ 7. Describe in detail the methodological idea of this article. Be sure to use {} answers (proper nouns need to be marked in English). For example, its steps are.- (1):...- (2):...- (3):...- .......Follow the format of the output that follows: 7. Methods: \n\n- (1):xxx;\n - (2):xxx;\n - (3):xxx;\n ....... \n\n Be sure to use {} answers (proper nouns need to be marked in English), statements as concise and academic as possible, do not repeat the content of the previous <summary>, the value of the use of the original numbers, be sure to strictly follow the format, the corresponding content output to xxx, in accordance with \n line feed, ....... means fill in according to the actual requirements, if not, you can not write. """.format(self.language, self.language)},]# 根据API类型选择适当的调用方法if openai.api_type == 'azure':response = openai.ChatCompletion.create(engine=self.chatgpt_model,messages=messages,)else:response = openai.ChatCompletion.create(model=self.chatgpt_model,messages=messages,)# 从返回的答案中初始化一个空字符串用于保存结果result = ''# 遍历返回的选择,将内容添加到结果字符串中for choice in response.choices:result += choice.message.content# 打印方法的结果和相关的token使用情况print("method_result:\n", result)print("prompt_token_used:", response.usage.prompt_tokens,"completion_token_used:", response.usage.completion_tokens,"total_token_used:", response.usage.total_tokens)# 打印响应时间print("response_time:", response.response_ms / 1000.0, 's')# 返回结果字符串return result
- chat_conclusion():提取上面两部分:chat_summary()和chat_method()得到的结果(API给的回复),加上conclusion部分的内容,设定prompt通过调用API的方式得到对应的总结
def chat_conclusion(self, text, conclusion_prompt_token=800):# 设置OpenAI的API密钥openai.api_key = self.chat_api_list[self.cur_api]# 使当前API索引递增,以便下次使用不同的API密钥self.cur_api += 1# 如果当前API索引超过API密钥列表的长度,将其重置为0self.cur_api = 0 if self.cur_api >= len(self.chat_api_list) - 1 else self.cur_api# 使用encoding方法计算输入文本的token数量text_token = len(self.encoding.encode(text))# 计算需要裁剪的文本长度,以适应模型的最大token限制clip_text_index = int(len(text) * (self.max_token_num - conclusion_prompt_token) / text_token)# 裁剪文本clip_text = text[:clip_text_index]# 定义要发送给ChatGPT的消息列表messages = [# 系统角色的消息,描述用户作为一个审稿人的背景{"role": "system", "content": "You are a reviewer in the field of [" + self.key_word + "] and you need to critically review this article"},# 助手角色的消息,描述要助手完成的任务{"role": "assistant", "content": "This is the <summary> and <conclusion> part of an English literature, where <summary> you have already summarized, but <conclusion> part, I need your help to summarize the following questions:" + clip_text},# 用户角色的消息,提供具体问题和预期的答案格式{"role": "user", "content": """ 8. Make the following summary.Be sure to use {} answers (proper nouns need to be marked in English).- (1):What is the significance of this piece of work?- (2):Summarize the strengths and weaknesses of this article in three dimensions: innovation point, performance, and workload. .......Follow the format of the output later: 8. Conclusion: \n\n- (1):xxx;\n - (2):Innovation point: xxx; Performance: xxx; Workload: xxx;\n Be sure to use {} answers (proper nouns need to be marked in English), statements as concise and academic as possible, do not repeat the content of the previous <summary>, the value of the use of the original numbers, be sure to strictly follow the format, the corresponding content output to xxx, in accordance with \n line feed, ....... means fill in according to the actual requirements, if not, you can not write. """.format(self.language, self.language)},]# 根据API类型选择适当的方法来获取模型的答案if openai.api_type == 'azure':response = openai.ChatCompletion.create(engine=self.chatgpt_model,messages=messages,)else:response = openai.ChatCompletion.create(model=self.chatgpt_model,messages=messages,)# 初始化结果字符串result = ''# 遍历模型返回的答案,将其添加到结果字符串中for choice in response.choices:result += choice.message.content# 打印结论部分的结果和token使用情况print("conclusion_result:\n", result)print("prompt_token_used:", response.usage.prompt_tokens,"completion_token_used:", response.usage.completion_tokens,"total_token_used:", response.usage.total_tokens)# 打印响应时间print("response_time:", response.response_ms / 1000.0, 's')# 返回结果字符串return result
1.4.3 chat_paper_main
// 待更
1.5 RUN一下:ChatPaper代码整体运行后得到的部分结果
chatpaper代码运行后得到的部分结果 输出:标题、作者、单位、 关键词、相关链接及 Summary。其中
- Summary为总结 得到的摘要
- method_result:对论文方法(method或approach)的总结
- Conclusion_result:对论文全文的总结(包含工作意义及创新点等)
//待更
第二部分 gpt_academic源码解读
// 待更