文章目录
- LCEL
- 流式调用案例
- invoke的异步调用
- 异步流中的事件
LCEL
LangChain Expression Language,是一种强大的工作流编排工具,可以从基本组件构建复杂的任务链(Chain),有如下亮点:
- 流式支持;
- chain = prompt | model | parser
- chain.stream({“input”: “xxx”})
- chain.astream({“input”: “异步流式调用”})
- 流式,逐个返回数据块,避免因数据量大导致用户等待;
- 异步支持;
- chain.invoke({“input”: “同步调用”})
- chain.ainvoke({“input”: “异步调用”})
- 一次性返回所有数据,如果数据量大,用户可能会一直等待;
- 重试与回退;
- 获取中间结果;
流式调用案例
import os
from langchain_community.chat_models import QianfanChatEndpoint
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, FunctionMessage, ToolMessage
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser, MarkdownListOutputParser
import asyncio# 环境变量,指定ak/sk
os.environ["QIANFAN_AK"] = "xxx"
os.environ["QIANFAN_SK"] = "xxx"# 千帆大模型 (文心一言)
qianfan_chat = QianfanChatEndpoint(model="ERNIE-3.5-8K",temperature=0.2, # 越大越随机回答timeout=50, # 有时网络不好,可能会超时,可以指定长些# api_key="...",# secret_key="...",# top_p="...",# other params...)# 字符串模板
prompt = ChatPromptTemplate.from_template("请简单介绍下用于{area}的深度学习算法都有哪些?")# 模型输出的为 AIMessage,需解析为对应的数据格式
parser = StrOutputParser() # 或者 JsonOutputParser# 组装链
chat_chain = prompt | qianfan_chat | parser# 异步流式调用
async def call():# 异步for循环async for chunck in chat_chain.astream({"area": "图像识别领域"}):# chunck.content print(chunck, end="", flush=True) # chunck对象被解析为字符串或者json类型# 异步入口
asyncio.run(call()) # 传入一个协程
invoke的异步调用
import os
from langchain_community.chat_models import QianfanChatEndpoint
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, FunctionMessage, ToolMessage
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser, MarkdownListOutputParser
import asyncio# 环境变量,指定ak/sk
os.environ["QIANFAN_AK"] = "xxx"
os.environ["QIANFAN_SK"] = "xxx"# 千帆大模型 (文心一言)
qianfan_chat = QianfanChatEndpoint(model="ERNIE-3.5-8K",temperature=0.2, # 越大越随机回答timeout=30, # 有时网络不好,可能会超时,可以指定长些# api_key="...",# secret_key="...",# top_p="...",# other params...)# 定义自己的模板
my_tpl = """
## 角色:你是一个非常专业的数据分析工程师,可以判定数据前后的变化是否属于真实变更。关于文件名、版本的变更都是非真实变更,其他的变更都是真实变更。## 任务目标: before表示变更前的数据,after表示变更后的数据,change_content 表示变更内容,判断变更的内容是否为真实的变更。## before: {before}## after: {after}## change_content: {change_content}## 背景信息: 表格相关的变更基本上都是真实的变更。 ## 约束条件:## 输出格式为 Markdown。## 避免使用过于专业的术语。## 示例:## 如 before 的内容为 “编程语言为python”, after的内容为“编程语言为java”, change_content为“编程语言从python变更为java”, 这属于真实的变更。## 其他说明:## 当你回答的准确率低于60%时,请说明答案的可信度。
"""# 使用自定义的模板
prompt = ChatPromptTemplate.from_template(my_tpl)# 模型输出的为 AIMessage,需解析为对应的数据格式
parser = StrOutputParser()# 组装链
chat_chain = prompt | qianfan_chat | parser# 异步调用
async def call():# 异步调用result = Nonetry:result = await chat_chain.ainvoke({"before": "5", "after": "10", "change_content": "表格CC中第一行、第一列单元格的值从5变成了10"})except:print("except...")return resultdef thread_main():# 子线程中 异步入口,自动管理事件循环(创建,关闭)r = asyncio.run(call()) # 传入一个协程print("result:", r)if __name__ == '__main__':import timeimport threading# 同步调用# r = chat_chain.invoke({"before": "5", "after": "10", "change_content": "表格CC中第一行、第一列单元格的值从5变成了10"})# 放在主线程会报错,所以放入子线程中th = threading.Thread(target=thread_main, name="async_th")th.setDaemon(True) # 设置为主线程的守护线程,主线程退出,则一起退出,不管有没有执行完th.start()# 主线程的事件循环while True:time.sleep(1)print("main thread running...")
异步流中的事件
# 查看异步流 中的事件
async def call():# 异步调用result = Nonetry:async for event in chat_chain.astream_events("hello", version="v2"):print("event:", event)except:print("except...")return result