通过本文你可以了解到:
- 什么是RAG?
- 如何搭建一个RAG应用?
- 目前开源的RAG应用有哪些?
大模型学习参考:
1.大模型学习资料整理:大模型学习资料整理:如何从0到1学习大模型,搭建个人或企业RAG系统,如何评估与优化(更新中…)
2.streamlit入门和简单使用:streamlit:如何快速构建一个应用,不会前端也能写出好看的界面
欢迎大家访问个人博客网址:https://www.maogeshuo.com,博主努力更新中…
RAG简介
检索增强生成(Retrieval Augmented Generation,RAG)是一种强大的工具,整合了从庞大知识库中检索到的相关信息,并以此为基础,指导大型语言模型生成更为精准的答案,从而显著提升了回答的准确性与深度。
2020 年,Meta AI 研究人员提出了RAG的方法,用于提高 LLM 在特定任务上的性能。LLM 擅长语言理解、推理和生成等任务,但也存在一些问题:
- 信息滞后:LLM 的知识是静态的,来源于当时训练时的数据,也就是 LLM 无法直接提供最新的信息。
- 模型幻觉:实践表明,当前的生成式 AI 技术存在一定的幻觉,而在一些常见的业务应用中,我们是希望保证事实性的。
- 私有数据匮乏:LLM 的训练数据主要来源于互联网公开的数据,而垂类领域、企业内部等有很多专属知识,这部分是 LLM 无法直接提供的。
- 内容不可追溯: LLM 生成的内容往往缺乏明确的信息来源,影响内容的可信度。RAG 将生成内容与检索到的原始资料建立链接,增强了内容的可追溯性,从而提升了用户对生成内容的信任度。
- 长文本处理能力较弱: LLM 在理解和生成长篇内容时受限于有限的上下文窗口,且必须按顺序处理内容,输入越长,速度越慢。RAG 通过检索和整合长文本信息,强化了模型对长上下文的理解和生成,有效突破了输入长度的限制,同时降低了调用成本,并提升了整体的处理效率。
RAG 通过将检索到的相关信息提供给 LLM,让 LLM 进行参考生成,可以较好地缓解上述问题。因此,合理使用 RAG 可以拓展 LLM 的知识边界,使其不仅能够访问专属知识库,还能动态地引入最新的数据,从而在生成响应时提供更准确、更新的信息。
RAG组成部分
自定义知识库,用于RAG检索的知识来源:
- 结构化的数据库形态:比如MySQL
- 非结构化的文档体系:比如文件、图片、音频、视频
RAG 是一个完整的系统,其工作流程可以简单地分为数据处理、检索、增强和生成四个阶段:
数据处理阶段
对原始数据进行清洗和处理。
将处理后的数据转化为检索模型可以使用的格式。
将处理后的数据存储在对应的数据库中。
检索阶段
将用户的问题输入到检索系统中,从数据库中检索相关信息。
增强阶段
对检索到的信息进行处理和增强,以便生成模型可以更好地理解和使用。
生成阶段
将增强后的信息输入到生成模型中,生成模型根据这些信息生成答案。
搭建RAG应用
数据处理
数据清洗和处理
数据处理阶段,一般需要对知识库中的数据进行数据清洗,比如去掉多余的换行、特殊符号,然后加载处理后的文件和分块:
- 加载文件:使用
langchain
下的document_loaders
加载pdf、docs、txt、md等格式文件 - 文本分块:分块的方式有很多,选择不同的分块方法、分块大小、chunk_overlap,对最后的检索结果有影响,这一阶段也有RAG的优化点之一
import osfrom langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredFileLoader
from langchain_text_splitters import RecursiveCharacterTextSplitterdef load_document(file):"""加载PDF、DOC、TXT文档:param file::return:"""name, extension = os.path.splitext(file)if extension == '.pdf':print(f'Loading {file}')loader = PyPDFLoader(file)elif extension == '.docx':print(f'Loading {file}')loader = Docx2txtLoader(file)elif extension == '.txt':loader = UnstructuredFileLoader(file)else:print('Document format is not supported!')return Nonedata = loader.load()return datadef chunk_data(data, chunk_size=256, chunk_overlap=150):"""将数据分割成块:param data::param chunk_size: chunk块大小:param chunk_overlap: 重叠部分大小:return:"""text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)chunks = text_splitter.split_documents(data)return chunks
embedding模型存储
将分块后的文本,使用embedding模型持久化存储,目前常用的中文模型是bge-large-zh-v1.5
。持久化存储后,避免每次都去embedding一次,消耗很长的时间。下次使用时,直接加载模型就可以了。
import osfrom langchain_community.embeddings import HuggingFaceBgeEmbeddings, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma, FAISSdef get_embedding(embedding_name):"""根据embedding名称去加载embedding模型:param embedding_name: 路径或者名称:return:"""if embedding_name == "bge":embedding_path = os.environ[embedding_name]model_kwargs = {'device': 'cpu'}return HuggingFaceBgeEmbeddings(model_name=embedding_path, model_kwargs=model_kwargs)if embedding_name == "bce":return None# create embeddings using OpenAIEmbeddings() and save them in a Chroma vector store
def create_embeddings_chroma(chunks):embeddings = OpenAIEmbeddings()vector_store = Chroma.from_documents(chunks, embeddings)# if you want to use a specific directory for chromadb# vector_store = Chroma.from_documents(chunks, embeddings, persist_directory='./mychroma_db')return vector_storedef create_embeddings_faiss(vector_db_path, embedding_name, chunks):"""使用FAISS向量数据库,并保存:param vector_db_path: 向量:param embedding_name::param chunks::return:"""embeddings = get_embedding(embedding_name)db = FAISS.from_documents(chunks, embeddings)if not os.path.isdir(vector_db_path):os.mkdir(vector_db_path)db.save_local(folder_path=vector_db_path)return dbdef load_embeddings_faiss(vector_db_path, embedding_name):"""加载向量库:param vector_db_path::param embedding_name::return:"""embeddings = get_embedding(embedding_name)db = FAISS.load_local(vector_db_path, embeddings, allow_dangerous_deserialization=True)return db
构建模型
采用了函数和类两种方式定义模型:
- 函数:get_llm_model定义了基本的参数,model、prompt、temperature、max_tokens、n_ctx
- 自定义类:
import os
import sys
import time
from abc import ABCfrom langchain_core.callbacks import CallbackManagerForLLMRun
from llama_cpp import Llama
from langchain.llms.base import LLM
from pydantic import Field
from typing import Dict, Any, Mapping, Optional, ListBASE_DIR = os.path.dirname(__file__)
# PRJ_DIR上层目录
# PRJ_DIR = os.path.abspath(os.path.join(BASE_DIR, ".."))
sys.path.append(BASE_DIR)def get_llm_model(prompt: str = None,model: str = None,temperature: float = 0.0,max_token: int = 2048,n_ctx: int = 512):"""根据模型名称去加载模型,返回response数据:param prompt::param model::param temperature::param max_token::param n_ctx::return:"""if model in ['Qwen_q2']:model_path = os.environ[model]llm = Llama(model_path=model_path, n_ctx=n_ctx)start = time.time()response = llm.create_chat_completion(messages=[{"role": "system","content": "你是一个智能超级助手,请用专业的词语回答问题,整体上下文带有逻辑性,如果不知道,请不要乱说",},{"role": "user","content": "{}".format(prompt)},],temperature=temperature,max_tokens=max_token,stream=False)cost = time.time() - startprint(f"模型生成时间:{cost}")print(f"大模型回复:\n{response}")return response['choices'][0]['message']['content']class QwenLLM(LLM):"""自定义QwenLLM"""model_name: str = "Qwen_q2"# 访问时延上限request_timeout: float = None# 温度系数temperature: float = 0.1# 窗口大小n_ctx = 2048# token大小max_tokens = 1024# 必备的可选参数model_kwargs: Dict[str, Any] = Field(default_factory=dict)def _call(self, prompt: str, stop: Optional[List[str]] = None,run_manager: Optional[CallbackManagerForLLMRun] = None,**kwargs: Any):qwen_path = os.environ[self.model_name]print("qwen_path:", qwen_path)llm = Llama(model_path=qwen_path, n_ctx=self.n_ctx)response = llm.create_chat_completion(messages=[{"role": "system","content": "你是一个智能超级助手,请用[中文]专业的词语回答问题,整体上下文带有逻辑性,并以markdown格式输出",},{"role": "user","content": "{}".format(prompt)},],temperature=self.temperature,max_tokens=self.max_tokens,stream=False)# prompt工程提示# print(f"Qwen prompt: \n{prompt}")# response = lla(# prompt=prompt,# temperature=self.temperature,# max_tokens=self.max_tokens# )print(f"Qwen response: \n{response}")# return response['choices'][0]['text']return response['choices'][0]['message']['content']@propertydef _llm_type(self) -> str:return "Llama3"# 定义一个返回默认参数的方法@propertydef _default_params(self) -> Dict[str, Any]:"""获取调用默认参数。"""normal_params = {"temperature": self.temperature,"request_timeout": self.request_timeout,"n_ctx": self.n_ctx,"max_tokens": self.max_tokens}# print(type(self.model_kwargs))return {**normal_params}@propertydef _identifying_params(self) -> Mapping[str, Any]:"""Get the identifying parameters."""return {**{"model_name": self.model_name}, **self._default_params}
构建应用
import sysimport streamlit as st
import os
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
import tiktoken
from dotenv import load_dotenv, find_dotenv
from langchain_core.prompts import PromptTemplateBASE_DIR = os.path.dirname(__file__)
PRJ_DIR = os.path.abspath(os.path.join(BASE_DIR, ".."))
sys.path.append(PRJ_DIR)from streamlit_demo.custom_llm import QwenLLM
from streamlit_demo.embedding_oper import create_embeddings_faiss, create_embeddings_chroma, load_embeddings_faiss
from streamlit_demo.prepare_data import load_document, chunk_data_ = load_dotenv(find_dotenv(), override=True)
vector_db_path = os.path.join(BASE_DIR, "vector_db")
print(f"vector_db_path: {vector_db_path}")DEFAULT_TEMPLATE = """你是一个聪明的超级智能助手,请用专业且富有逻辑顺序的句子回复,并以中文形式且markdown形式输出。检索到的信息:{context}问题:{question}
"""def ask_and_get_answer_from_local(model_name, vector_db, prompt, top_k=5):"""从本地加载大模型:param model_name: 模型名称:param vector_db::param prompt::param top_k::return:"""docs_and_scores = vector_db.similarity_search_with_score(prompt, k=top_k)print("docs_and_scores: ", docs_and_scores)# knowledge = [doc.page_content for doc in docs_and_scores]# print("检索到的知识:", knowledge)if model_name == "Qwen_q2":llm = QwenLLM(model_name=model_name, temperature=0.4)prompt_template = PromptTemplate(input_variables=["context", "question"], template=DEFAULT_TEMPLATE)retriever = vector_db.as_retriever(search_type='similarity', search_kwargs={'k': top_k})chain = RetrievalQA.from_chain_type(llm=llm,chain_type="stuff",retriever=retriever,chain_type_kwargs={"prompt": prompt_template},return_source_documents=True)answer = chain({"query": prompt, "top_k": top_k})print(f"answers: {answer}")# answer = chain.run(prompt)# answer = answer['choices'][0]['message']['content']answer = answer['result']return answerdef ask_and_get_answer(vector_store, q, k=3):llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=1)retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': k})chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever)answer = chain.run(q)return answer# calculate embedding cost using tiktoken
def calculate_embedding_cost(texts):enc = tiktoken.encoding_for_model('text-embedding-ada-002')total_tokens = sum([len(enc.encode(page.page_content)) for page in texts])# print(f'Total Tokens: {total_tokens}')# print(f'Embedding Cost in USD: {total_tokens / 1000 * 0.0004:.6f}')return total_tokens, total_tokens / 1000 * 0.0004# clear the chat history from streamlit session state
def clear_history():if 'history' in st.session_state:del st.session_state['history']if __name__ == "__main__":# st.image('img.png')st.subheader('LLM Question-Answering Application 🤖')with st.sidebar:# text_input for the OpenAI API key (alternative to python-dotenv and .env)api_key = st.text_input('OpenAI API Key:', type='password')if api_key:os.environ['OPENAI_API_KEY'] = api_keyllm = st.selectbox(label="请选择本地大模型",options=('Qwen_q2', 'Qwen_q6'))# 向量数据库embedding = st.selectbox("请选择向量数据库",('FAISS', 'Chroma'))# file uploader widgetuploaded_file = st.file_uploader('上传文件:', type=['pdf', 'docx', 'txt'])# chunk size number widgetchunk_size = st.number_input('chunk_size:', min_value=100, max_value=2048, value=512, on_change=clear_history)# chunk overlapchunk_overlap = st.number_input(label="chunk_overlap", min_value=0, max_value=1024, value=150,on_change=clear_history)# k number input widgetk = st.number_input('top_k', min_value=1, max_value=20, value=3, on_change=clear_history)# add data button widgetadd_data = st.button('添加数据', on_click=clear_history)# 输出方式output_type = st.selectbox("选择输出方式", ('普通输出', '流式输出'))if uploaded_file and add_data: # if the user browsed a filewith st.spinner('Reading, chunking and embedding file ...'):# writing the file from RAM to the current directory on diskbytes_data = uploaded_file.read()file_name = os.path.join('./', uploaded_file.name)with open(file_name, 'wb') as f:f.write(bytes_data)data = load_document(file_name)chunks = chunk_data(data, chunk_size=chunk_size, chunk_overlap=chunk_overlap)st.write(f'Chunk size: {chunk_size}, chunk_overlap: {len(chunks)} Chunks: {len(chunks)}')tokens, embedding_cost = calculate_embedding_cost(chunks)st.write(f'Embedding cost: ${embedding_cost:.4f}')# creating the embeddings and returning the Chroma vector store# 指定选择向量库和embedding类型,还可改进if embedding == "FAISS":vector_store = create_embeddings_faiss(vector_db_path=vector_db_path, embedding_name="bge",chunks=chunks)elif embedding == "Chroma":vector_store = create_embeddings_chroma(chunks)# saving the vector store in the streamlit session state (to be persistent between reruns)st.session_state.vs = vector_storest.success('File uploaded, chunked and embedded successfully.')# 初始化historyif "messages" not in st.session_state:st.session_state.messages = []# 展示对话for msg in st.session_state.messages:with st.chat_message(msg['role']):st.markdown(msg["content"])# React to user inputif prompt := st.chat_input("Say something"):# Display user message in chat message containerwith st.chat_message("user"):st.markdown(prompt)# Add user message to chat historyst.session_state.messages.append({"role": "user", "content": prompt})# load local vector dbif 'vs' not in st.session_state:# st.warning(body='正在努力加载模型中...', icon="⚠️")vector_store = load_embeddings_faiss(vector_db_path, "bge")st.session_state.vs = vector_storest.toast('Load vector store db success!', icon='😍')# 普通方式输出if prompt is not None:vector_store = st.session_state.vs# if vector_store is None:# st.warning(body='正在努力加载模型中,稍后再试', icon="⚠️")if output_type == "普通输出" and vector_store is not None:response = ""if llm == "GPT":response = ask_and_get_answer(vector_store, prompt, k)elif llm == "Qwen_q2":response = ask_and_get_answer_from_local(model_name="Qwen_q2", vector_db=vector_store, prompt=prompt, top_k=k)# Display assistant response in chat message containerwith st.chat_message("assistant"):st.markdown(response)# Add assistant response to chat historyst.session_state.messages.append({"role": "assistant", "content": response})else:# 流式输出# stream_res = get_llm_model_with_stream(prompt=prompt, model="Qwen_q2")# with st.chat_message("assistant"):# content = st.write_stream(stream_res)# print("流式输出:", content)# st.session_state.messages.append({"role": "assistant", "content": content})print("流式输出")# run the app: streamlit run ./chat_doc.py
结果展示
使用步骤:
- 选择参数,然后上传本地的文件
- 开始添加数据,用于数据处理和embedding持久化存储
开源的RAG应用
QAnything: https://github.com/netease-youdao/QAnything
AnythingLLM:https://github.com/Mintplex-Labs/anything-llm
ragflow:https://github.com/infiniflow/ragflow/blob/main/README_zh.md