环境:openEuler、python 3.11.6、Azure openAi、langchain 0.3.3、langgraph 0.2.38
背景:基于FastAPI、langchain实现一个QA系统,要求实现历史记录以及存储特征信息
时间:20241022
说明:在历史记录的存储中,增加自定义的存储内容,后期还要以流的形式实现
官方文档地址:Add summary of the conversation history
源码地址:尚无
1、环境搭建(略)
2、功能案例
1、实现记录和总结(langgraph)
概要:使用langgraph实现对话系统,该系统具备聊天历史记录和总结功能
官方文档地址:Add summary of the conversation history
实现具有总结和历史记录功能的chatbot
from typing import Literalfrom langchain_core.messages import SystemMessage, RemoveMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState, StateGraph, START, END
from langchain_core.messages import HumanMessage
from langchain_openai import AzureChatOpenAI# 加载环境中的api_key等信息
from dotenv import load_dotenvload_dotenv(".env")# 创建memory,用于记录state信息
memory = MemorySaver()# 在state中新增summary参数
class State(MessagesState):summary: str# 使用Azure作为chain
model = AzureChatOpenAI(deployment_name="aicontent-validation")# 定义访问模型的逻辑,这是node
def call_model(state: State):# summary是否存在,影响下一次的回答内容summary = state.get("summary", "")if summary:system_message = f"Summary of conversation earlier: {summary}"messages = [SystemMessage(content=system_message)] + state["messages"]else:messages = state["messages"]response = model.invoke(messages)# 将内容以列表的形式返回,会添加到state的列表中return {"messages": [response]}# 确认结束还是执行summarize_conversation
def should_continue(state: State) -> Literal["summarize_conversation", END]:"""Return the next node to execute."""messages = state["messages"]# 如果对话内容超过5条,则进行总结(转向执行summarize_conversation)if len(messages) > 5:return "summarize_conversation"# 这个graph结束了return ENDdef summarize_conversation(state: State):# 在state中获取summary的内容,没有则定义为空字符串summary = state.get("summary", "")if summary:# summary_message可以认为是一个prompt,是否存在summary_message,直接影响总结summary_message = (f"This is summary of the conversation to date: {summary}\n\n""Extend the summary by taking into account the new messages above:")else:summary_message = "Create a summary of the conversation above:"messages = state["messages"] + [HumanMessage(content=summary_message)]# response即为总结内容response = model.invoke(messages)# 总结后,仅保留两条消息记录delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]# delete_messages是最终保留的内容return {"summary": response.content, "messages": delete_messages}# 定义一个新的graph
workflow = StateGraph(State)# 定义节点,会话和总结两个node
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)# 定义边,起始节点为conversation,即call_model函数
workflow.add_edge(START, "conversation")# 添加条件边
workflow.add_conditional_edges(# 使用conversation,作为起始的node,意味着所有的边都在conversion节点后被访问"conversation",# 通过该function,来确认到底执行哪个节点should_continue,
)# summarize_conversation到END是单向边
workflow.add_edge("summarize_conversation", END)# 编译,添加参数checkpointer,指定memory
app = workflow.compile(checkpointer=memory)# 获取信息
def print_update(update):for k, v in update.items():for m in v["messages"]:m.pretty_print()if "summary" in v:print(v["summary"])
# 定义配置文件,用于后期查询历史
config = {"configurable": {"thread_id": "4"}}
调用graph实现对话,当大于6条则总结
# 下面三个是问题以及输出
input_message = HumanMessage(content="hi! I'm bob")
input_message.pretty_print()
for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):print_update(event)input_message = HumanMessage(content="what's my name?")
input_message.pretty_print()
for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):print_update(event)input_message = HumanMessage(content="i like the celtics!")
input_message.pretty_print()
for event in app.stream({"messages": [input_message]}, config, stream_mode="updates"):print_update(event)
输出:
[root@Laptop-latitude-7300 chatbot_fastapi]# python test.py
================================ Human Message =================================hi! I'm bob
================================== Ai Message ==================================Hello Bob! How can I assist you today?
================================ Human Message =================================what's my name?
================================== Ai Message ==================================Your name is Bob.
================================ Human Message =================================i like the celtics!
================================== Ai Message ==================================That's great! The Boston Celtics are indeed a fantastic basketball team. They have a rich history and a large fanbase. Are you excited about the upcoming games?
================================ Remove Message ================================================================ Remove Message ================================================================ Remove Message ================================================================ Remove Message ================================In this conversation, the user introduces himself as Bob. He also expresses his liking for the Boston Celtics, a professional basketball team. The assistant acknowledges his interest and asks if he is excited about the upcoming games.
其中pretty_print、print_update是graph格式化的输出,示例如下:
input_message.pretty_print()实现了如下输出
================================ Human Message =================================i like the celtics!
print_update(event)输出如下:
================================== Ai Message ==================================Your name is Bob.
或
================================ Remove Message ================================
2、FastAPI、langgraph实现chatbot(非流式)
基于上述代码,实现非流式输出的问答系统
安装相应的模块
pip install fastapi python-dotenv langchain_core langchain langgraph uvicorn langchain_openai
通过fastapi实现接口
# main.py
from dotenv import load_dotenvload_dotenv(".env")
from fastapi import FastAPI, Request, WebSocket
from pydantic import BaseModel
from json import loads
from chat import generate_response, get_chat_historyapp = FastAPI()# 定义请求和响应模型
class ChatRequest(BaseModel):user_id: strinput: strclass ChatResponse(BaseModel):response: strclass ChatHistoryResponse(BaseModel):history: list
# 问答接口
@app.get("/chat")
async def websocket_ping(query: str, user_id: str): response = await generate_response(query, user_id) return {"response": response}# 获取历史记录接口
@app.get("/chat_history/{user_id}")
async def get_history_endpoint(user_id: str):# 获取聊天历史记录chat_history = get_chat_history(user_id)return {"history": chat_history}
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
通过调用generate_response函数,返回响应
生成响应以及获取历史记录的函数
from langchain_core.messages import HumanMessage, AIMessage
from langgraph_test import graphapp
from langchain_core.output_parsers import StrOutputParser
from langgraph.checkpoint.serde import jsonplus
from ignore_socket import ignore_msgpack_defaultjsonplus._msgpack_default = ignore_msgpack_default# 转换函数, 将数据中的AIMessage,HumanMessage转换为字符串
def message_to_dict(message):if isinstance(message, HumanMessage):return {'content': message.content,'type': 'human','additional_kwargs': message.additional_kwargs,'response_metadata': message.response_metadata,'id': message.id}elif isinstance(message, AIMessage):return {'content': message.content,'type': 'ai','additional_kwargs': message.additional_kwargs,'response_metadata': message.response_metadata,'id': message.id,'usage_metadata': message.usage_metadata}else:raise ValueError(f"Unsupported message type: {type(message)}")# 定义生成对话的函数
async def generate_response(input_text: str, user_id: str):# 定义用户配置文件config = {"configurable": {"thread_id": user_id}}# 实例化用户输入的消息input_message = HumanMessage(content=input_text)# 处理消息event = await graphapp.ainvoke({"messages":input_message}, config=config, stream_mode="values")# 返回信息内容return event.get("messages")[-1].content# 获取聊天历史记录
def get_chat_history(user_id: str):# 通过config获取相关用户聊天信息values = graphapp.get_state({"configurable": {"thread_id": user_id}}).valuestry:# 仅获取messages中的content、type数据values['messages'] = [message_to_dict(message) for message in values['messages']]values["messages"] = [{k: v for k, v in message.items() if k in ["content", "type"]} for message in values["messages"]]except:values["messages"] = []finally:return values
使用langgraph实现对话功能
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Literal
from langchain_core.messages import HumanMessage
from langchain_openai import AzureChatOpenAI
from langchain_core.messages import SystemMessage, RemoveMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict, Annotatedfrom langchain_core.messages import AnyMessagememory = MemorySaver()# 我们将使用此模型进行对话和总结
llm = AzureChatOpenAI(deployment_name="aicontent-validation")# 定义 Prompt 模板(仅用于输出字符串)
prompt_template = PromptTemplate(input_variables=["query"],template="You are a helpful assistant. User: {query}. Assistant:"
)# 创建 LangChain 的处理链
chain = prompt_template | llm | StrOutputParser()# 我们将添加一个 'summary' 属性
class State(TypedDict):messages: Annotated[list[AnyMessage], add_messages]summary: str# 定义调用模型的逻辑
async def call_model(state: State):# 如果存在摘要,我们会将其作为系统消息添加summary = state.get("summary", "")if summary:system_message = f"Summary of conversation earlier: {summary}"messages = [SystemMessage(content=system_message)] + state["messages"]else:messages = state["messages"]# 两种调用方式,参数支持{"input":messages}、messages# response = chain.invoke({"input":messages})response =await chain.ainvoke(messages)# 我们返回一个列表,因为它将被添加到现有列表中return {"messages": [response]}# 现在,我们定义用于确定是结束还是总结对话的逻辑
async def should_continue(state: State) -> Literal["summarize_conversation", END]:"""Return the next node to execute."""messages = state["messages"]# 如果消息超过 6 条,则我们汇总对话if len(messages) > 1:return "summarize_conversation"# 否则我们就可以结束return ENDasync def summarize_conversation(state: State):# 首先,获取总结内容summary = state.get("summary", "")if summary:# 如果 summary 已经存在,我们使用不同的系统提示符总结它summary_message = (f"This is summary of the conversation to date: {summary}\n\n""Extend the summary by taking into account the new messages above:")else:summary_message = "Create a summary of the conversation above:"# 历史信息和总结信息messages = state["messages"] + [HumanMessage(content=summary_message)]# 获取总结信息response = await chain.ainvoke(messages)# 我们现在需要删除我们不想再显示的消息,将删除除最后两条消息之外的所有消息delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]return {"summary": response, "messages": delete_messages}# 定义新的graph
workflow = StateGraph(State)# 定义 conversation 节点和 summarize 节点
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)# 将入口点设置为对话
workflow.add_edge(START, "conversation")# 现在,我们添加一条条件边
workflow.add_conditional_edges(# 首先,我们定义起始节点。我们使用 'conversation'。# 这意味着这些是调用 'conversation' 节点后采用的边。"conversation",# 接下来,我们传入一个函数,该函数将确定接下来调用哪个节点。should_continue,
)# 我们现在添加一条从 'summarize_conversation' 到 END 的法线边。
# 这意味着在调用 'summarize_conversation' 之后,我们结束。
workflow.add_edge("summarize_conversation", END)# 最后,我们编译它!
graphapp = workflow.compile(checkpointer=memory)
测试功能
启动服务:uvicorn main:app --reload --host 0.0.0.0 --port 8000
浏览器访问:http://172.26.20.199:8000/docs
打开chat接口的折叠,点击try it out,在输入框输入信息,点击下面的执行,稍微等待,如下:
再次执行输入:what is my name,用来测试是否具备记忆功能
测试总结功能:使用上述定义的123
在summary中存在总结(摘要),成功
3、FastAPI、langgraph实现chatbot(流式)
基于上述代码,实现基于FastAPI、langchain、langgraph的流式传输,
FastAPI官方文档:WebSocket - FastAPI 中文
主要借鉴官方文档,在此基础上实现了流式的传输
安装相应的模块
pip install fastapi python-dotenv langchain_core langchain langgraph uvicorn websockets langchain_openai
1、设计html文件,实现websocket请求,基于FastAPI文档魔改
// websocket_test.html<!DOCTYPE html>
<html lang="en"><head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>pc</title><body>
<div id="rowAiChatbot">
</div><script>var ws = new WebSocket('ws://localhost:8000/chat');ws.onopen = function () {console.log('ws onopen');ws.send('{"query":"my name is jack", "user_id":"123"}');};ws.onmessage = function (e) {console.log('ws onmessage');console.log('from server: ' , e.data);};
</script>
</body>
</html>
2、设计FastAPI主程序,实现接收参数并执行
# main.py
from dotenv import load_dotenvload_dotenv(".env")
from fastapi import FastAPI, WebSocket
from json import loads
from chat import generate_response, get_chat_historyapp = FastAPI()@app.websocket("/chat")
async def websocket_ping(websocket: WebSocket):await websocket.accept()try:while True:# 处理消息等逻辑data = await websocket.receive_text()jsondata = loads(data)# 生成响应await generate_response(jsondata.get("query"), jsondata.get("user_id"), websocket)
3、实现调用graph,实现流式输出
借鉴langgraph文档:LangGraph:Stream events from the final node
# chat.py
from langchain_core.messages import HumanMessage, AIMessage
from langgraph_test import graphapp# 定义生成对话的函数
async def generate_response(input_text: str, user_id: str, websocket: object):# 定义用户配置文件config = {"configurable": {"thread_id": user_id}}# 实例化用户输入的消息input_message = HumanMessage(content=input_text)# 异步流处理消息async for msg, metadata in graphapp.astream({"messages":input_message}, config, stream_mode="messages"):# 仅消息存在且节点为conversation的才返回给前端if msg.content and metadata.get("langgraph_node") == 'conversation' and metadata.get("ls_provider"):# print(msg.content, end=" ")# 使用websocket实现流式传输到前端await websocket.send_text(msg.content)
4、langgraph实现总结、记录
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Literal
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import AzureChatOpenAI
from langchain_core.messages import SystemMessage, RemoveMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState, StateGraph, START, ENDmemory = MemorySaver()# 我们将使用此模型进行对话和总结
llm = AzureChatOpenAI(deployment_name="aicontent-validation")# 定义 Prompt 模板(仅用于输出字符串)
prompt_template = PromptTemplate(input_variables=["query"],template="You are a helpful assistant. User: {query}. Assistant:"
)# 创建 LangChain 的处理链
chain = prompt_template | llm | StrOutputParser()# 我们将添加一个 'summary' 属性(除了MessagesState已经有 'messages' 键之外)
class State(MessagesState):summary: str# 定义调用模型的逻辑
async def call_model(state: State):# 如果存在摘要,我们会将其作为系统消息添加summary = state.get("summary", "")if summary:system_message = f"Summary of conversation earlier: {summary}"messages = [SystemMessage(content=system_message)] + state["messages"]else:messages = state["messages"]# 两种调用方式,参数支持{"input":messages}、messages# response = chain.invoke({"input":messages})response =await chain.ainvoke(messages)# print(response)# 用于验证输出一致,此处执行与generate_response位置输出时间基本一致# async for msg in chain.astream(messages):# print(msg)# 我们返回一个列表,因为它将被添加到现有列表中return {"messages": [AIMessage(content=response)]}# 现在,我们定义用于确定是结束还是总结对话的逻辑
async def should_continue(state: State) -> Literal["summarize_conversation", END]:"""Return the next node to execute."""messages = state["messages"]# 如果消息超过 6 条,则我们汇总对话if len(messages) > 3:return "summarize_conversation"# 否则我们就可以结束return ENDasync def summarize_conversation(state: State):# 首先,获取总结内容summary = state.get("summary", "")if summary:# 如果 summary 已经存在,我们使用不同的系统提示符总结它summary_message = (f"This is summary of the conversation to date: {summary}\n\n""Extend the summary by taking into account the new messages above:")else:summary_message = "Create a summary of the conversation above:"# 历史信息和总结信息messages = state["messages"] + [HumanMessage(content=summary_message)]# 获取总结信息response = await chain.ainvoke(messages)# 我们现在需要删除我们不想再显示的消息,将删除除最后两条消息之外的所有消息delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]return {"summary": response, "messages": delete_messages}# 定义新的graph
workflow = StateGraph(State)# 定义 conversation 节点和 summarize 节点
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)# 将入口点设置为对话
workflow.add_edge(START, "conversation")# 现在,我们添加一条条件边
workflow.add_conditional_edges(# 首先,我们定义起始节点。我们使用 'conversation'。# 这意味着这些是调用 'conversation' 节点后采用的边。"conversation",# 接下来,我们传入一个函数,该函数将确定接下来调用哪个节点。should_continue,
)# 我们现在添加一条从 'summarize_conversation' 到 END 的法线边。
# 这意味着在调用 'summarize_conversation' 之后,我们结束。
workflow.add_edge("summarize_conversation", END)# 最后,我们编译它!
graphapp = workflow.compile(checkpointer=memory)
5、测试流式传输
使用浏览器打开html文件,效果如下:
6、获取历史记录
请求接口
@app.get("/chat_history/{user_id}")
async def get_history_endpoint(user_id: str):# 获取聊天历史记录chat_history = get_chat_history(user_id)return {"history": chat_history}
获取逻辑方法,定义一个函数进行数据转换
# chat.py# 转换函数, 将数据中的AIMessage,HumanMessage转换为字符串
def message_to_dict(message):if isinstance(message, HumanMessage):return {'content': message.content,'type': 'human','additional_kwargs': message.additional_kwargs,'response_metadata': message.response_metadata,'id': message.id}elif isinstance(message, AIMessage):return {'content': message.content,'type': 'ai','additional_kwargs': message.additional_kwargs,'response_metadata': message.response_metadata,'id': message.id,'usage_metadata': message.usage_metadata}else:raise ValueError(f"Unsupported message type: {type(message)}")# 获取聊天历史记录
def get_chat_history(user_id: str):# 通过config获取相关用户聊天信息values = graphapp.get_state({"configurable": {"thread_id": user_id}}).valuestry:# 仅获取messages中的content、type数据values['messages'] = [message_to_dict(message) for message in values['messages']]values["messages"] = [{k: v for k, v in message.items() if k in ["content", "type"]} for message in values["messages"]]except:values["messages"] = []finally:return values
浏览器测试
网址:http://172.25.23.143:8000/docs#