本文翻译整理自:LlamaIndex - Workflow
https://docs.llamaindex.ai/en/stable/module_guides/workflow/
文章目录
- 一、入门
- 1、定义工作流事件
- 2、设置工作流类
- 3、工作流入口点
- 4、Workflow Exit Points
- 5、运行工作流
- 二、绘制工作流程
- 三、使用全局上下文/状态
- 四、等待多个事件
- 五、手动触发事件
- 六、流媒体事件
- 七、失败的重试步骤
- 八、逐步执行
- 九、装饰非类函数
- 十、跨运行维护上下文
- 十一、部署工作流
- 十二、例子
LlamaIndex中的Workflow
是一个事件驱动的抽象,用于将多个事件链接在一起。工作流由steps
组成,每个步骤负责处理某些事件类型并发出新事件。
Workflow
在LlamaIndex中的工作是通过使用@step
装饰器装饰函数。这用于推断每个工作流的输入和输出类型以进行验证,并确保每个步骤仅在接受的事件准备就绪时运行。
您可以创建Workflow
来做任何事情!构建代理、RAG流、提取流或任何您想要的东西。
工作流也被自动检测,因此您可以使用Arize Pheonix等工具将可观察性带入每个步骤。(**注意:**可观察性适用于利用较新的检测系统的集成。用法可能会有所不同。)
小费
工作流使async成为一等公民,本页假定您在异步环境中运行。这对您来说意味着正确设置异步代码。如果您已经在FastAPI等服务器或笔记本中运行,您可以自由使用wait了!
如果您正在运行自己的python脚本,最好的做法是有一个异步切入点。
async def main():w = MyWorkflow(...)result = await w.run(...)print(result)if __name__ == "__main__":import asyncioasyncio.run(main())
一、入门
作为一个说明性示例,让我们考虑一个简单的工作流程,其中生成一个笑话,然后进行批评。
from llama_index.core.workflow import (Event,StartEvent,StopEvent,Workflow,step,
)# `pip install llama-index-llms-openai` if you don't already have it
from llama_index.llms.openai import OpenAIclass JokeEvent(Event):joke: strclass JokeFlow(Workflow):llm = OpenAI()@stepasync def generate_joke(self, ev: StartEvent) -> JokeEvent:topic = ev.topicprompt = f"Write your best joke about {topic}."response = await self.llm.acomplete(prompt)return JokeEvent(joke=str(response))@stepasync def critique_joke(self, ev: JokeEvent) -> StopEvent:joke = ev.jokeprompt = f"Give a thorough analysis and critique of the following joke: {joke}"response = await self.llm.acomplete(prompt)return StopEvent(result=str(response))w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
这里有一些移动的部分,让我们一块一块地看。
1、定义工作流事件
class JokeEvent(Event):joke: str
事件是用户定义的pydantic对象。您可以控制属性和任何其他辅助方法。在这种情况下,我们的工作流依赖于单个用户定义的事件JokeEvent
。
2、设置工作流类
class JokeFlow(Workflow):llm = OpenAI(model="gpt-4o-mini")...
我们的工作流是通过子类化Workflow
类来实现的。为了简单起见,我们附加了一个静态OpenAI
实例。
3、工作流入口点
class JokeFlow(Workflow):...@stepasync def generate_joke(self, ev: StartEvent) -> JokeEvent:topic = ev.topicprompt = f"Write your best joke about {topic}."response = await self.llm.acomplete(prompt)return JokeEvent(joke=str(response))...
这里,我们来到工作流的入口点。虽然事件是使用定义的,但有两个特殊情况事件,StartEvent
和StopEvent
。在这里,StartEvent
表示将初始工作流输入发送到哪里。
这个StartEvent
是一个特殊的对象,因为它可以保存任意属性。在这里,我们使用ev.topic
访问主题,如果它不存在,会引发错误。您也可以使用ev.get("topic")
来处理属性可能不存在而不会引发错误的情况。
至此,您可能已经注意到,我们并没有明确告诉工作流哪些步骤处理哪些事件,而是使用@step
装饰器来推断每个步骤的输入和输出类型,更进一步,这些推断的输入和输出类型也用于在运行前为您验证工作流是否有效!
4、Workflow Exit Points
class JokeFlow(Workflow):...@stepasync def critique_joke(self, ev: JokeEvent) -> StopEvent:joke = ev.jokeprompt = f"Give a thorough analysis and critique of the following joke: {joke}"response = await self.llm.acomplete(prompt)return StopEvent(result=str(response))...
在这里,我们有了工作流中的第二步,也是最后一步。我们知道这是最后一步,因为返回了特殊的StopEvent
。当工作流遇到返回的StopEvent
时,它会立即停止工作流并返回任何结果。
在这种情况下,结果是一个字符串,但它可以是字典、列表或任何其他对象。
5、运行工作流
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
最后,我们创建并运行工作流。有一些设置,如超时(以秒为单位)和冗长来帮助调试。
..run()
方法是异步的,所以我们在这里使用wait来等待结果。
二、绘制工作流程
可以使用步骤定义中类型注释的强大功能来可视化工作流。您可以绘制工作流中所有可能的路径,也可以绘制最近的执行,以帮助调试。
首次安装:
pip install llama-index-utils-workflow
然后导入并使用:
from llama_index.utils.workflow import (draw_all_possible_flows,draw_most_recent_execution,
)# Draw all
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")# Draw an execution
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")
三、使用全局上下文/状态
或者,您可以选择在步骤之间使用全局上下文。例如,可能多个步骤访问用户的原始query
输入。您可以将其存储在全局上下文中,以便每个步骤都有访问权限。
from llama_index.core.workflow import Context@step
async def query(self, ctx: Context, ev: MyEvent) -> StopEvent:# retrieve from contextquery = await ctx.get("query")# do something with context and eventval = ...result = ...# store in contextawait ctx.set("key", val)return StopEvent(result=result)
四、等待多个事件
上下文不仅仅保存数据,它还提供实用程序来缓冲和等待多个事件。
例如,您可能有一个步骤在合成响应之前等待查询和检索到的节点:
from llama_index.core import get_response_synthesizer@step
async def synthesize(self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])# check if we can runif data is None:return None# unpack -- data is returned in orderquery_event, retrieve_event = data# run response synthesissynthesizer = get_response_synthesizer()response = synthesizer.synthesize(query_event.query, nodes=retrieve_event.nodes)return StopEvent(result=response)
使用ctx.collect_events()
我们可以缓冲并等待所有预期的事件到达。这个函数只会在所有事件到达后返回数据(按请求的顺序)。
五、手动触发事件
通常,事件是通过在步骤中返回另一个事件来触发的。但是,也可以使用工作流中的ctx.send_event(event)
方法手动调度事件。
这是一个简短的玩具示例,展示了如何使用它:
from llama_index.core.workflow import step, Context, Event, Workflowclass MyEvent(Event):passclass MyEventResult(Event):result: strclass GatherEvent(Event):passclass MyWorkflow(Workflow):@stepasync def dispatch_step(self, ctx: Context, ev: StartEvent) -> MyEvent | GatherEvent:ctx.send_event(MyEvent())ctx.send_event(MyEvent())return GatherEvent()@stepasync def handle_my_event(self, ev: MyEvent) -> MyEventResult:return MyEventResult(result="result")@stepasync def gather(self, ctx: Context, ev: GatherEvent | MyEventResult) -> StopEvent | None:# wait for events to finishevents = ctx.collect_events([MyEventResult, MyEventResult])if not events:return Nonereturn StopEvent(result=events)
六、流媒体事件
您还可以在事件进入时迭代它们。这对于流式传输、显示进度或调试很有用。
w = MyWorkflow(...)handler = w.run(topic="Pirates")async for event in handler.stream_events():print(event)result = await handler
七、失败的重试步骤
执行失败的步骤可能会导致整个工作流失败,但通常会出现错误,可以安全地重试执行。想象一下,由于网络暂时拥塞而超时的HTTP请求,或者遇到速率限制的外部API调用。
对于所有希望该步骤重试的情况,您可以使用“重试策略”。重试策略是一个对象,它指示工作流多次执行一个步骤,规定在新的尝试之前必须经过多长时间。策略考虑了自第一次故障以来经过了多长时间,连续发生了多少次故障,以及最后发生了哪个错误。
要为特定步骤设置策略,您所要做的就是将策略对象传递给 @step
装饰器:
from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicyclass MyWorkflow(Workflow):# ...more workflow definition...# This policy will retry this step on failure every 5 seconds for at most 10 times@step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=10))async def flaky_step(self, ctx: Context, ev: StartEvent) -> StopEvent:result = flaky_call() # this might raisereturn StopEvent(result=result)
您可以查看API文档以获取策略的详细描述 框架中可用。如果您找不到适合您的用例的策略,您可以轻松编写 自定义策略的唯一要求是编写一个尊重RetryPolicy
协议。换句话说,您的自定义策略类必须具有具有以下签名的方法:
def next(self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:...
例如,这是一个重试策略,它对周末感到兴奋,并且仅在周五重试一个步骤:
from datetime import datetimeclass RetryOnFridayPolicy:def next(self, elapsed_time: float, attempts: int, error: Exception) -> Optional[float]:if datetime.today().strftime("%A") == "Friday":# retry in 5 secondsreturn 5# tell the workflow we don't want to retryreturn None
八、逐步执行
工作流具有用于逐步执行的内置实用程序,允许您随着事情的进展控制执行和调试状态。
w = JokeFlow(...)# Kick off the workflow
handler = w.run(topic="Pirates")# Iterate until done
async for _ in handler:# inspect context# val = await handler.ctx.get("key")continue# Get the final result
result = await handler
九、装饰非类函数
您还可以在不子类化的情况下将步骤装饰和附加到工作流。
下面是前面的JokeFlow
,但没有定义子类。
from llama_index.core.workflow import (Event,StartEvent,StopEvent,Workflow,step,
)
from llama_index.llms.openai import OpenAIclass JokeEvent(Event):joke: strjoke_flow = Workflow(timeout=60, verbose=True)@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:topic = ev.topicprompt = f"Write your best joke about {topic}."llm = OpenAI()response = await llm.acomplete(prompt)return JokeEvent(joke=str(response))@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:joke = ev.jokeprompt = (f"Give a thorough analysis and critique of the following joke: {joke}")response = await llm.acomplete(prompt)return StopEvent(result=str(response))
十、跨运行维护上下文
如您所见,工作流有一个Context
对象,可用于跨步骤维护状态。
如果要在工作流的多次运行中维护状态,可以将先前的上下文传递给.run()
方法。
handler = w.run()
result = await handler# continue with next run
handler = w.run(ctx=handler.ctx)
result = await handler
十一、部署工作流
您可以将工作流部署为具有llama_deploy(repo)的多代理服务。每个代理服务都通过控制面进行编排,并通过消息队列进行通信。在本地或库伯内特斯上部署。
十二、例子
您可以在下面的笔记本中找到许多使用工作流的有用示例:
- 高级文本SQL
- 引文查询引擎
- 常见的工作流模式
- 校正RAG
- 函数调用代理
- 循环中的人类:故事制作
- JSON查询引擎
- 长RAG
- 多步骤查询引擎
- 多策略工作流程
- RAG+重新排序
- 反应代理
- 可靠的结构化发电
- 路由器查询引擎
- 自我发现工作流程
- 子问题查询引擎
- 利用并发
2024-09-24(二)