LlamaIndex - Workflow

本文翻译整理自:LlamaIndex - Workflow
https://docs.llamaindex.ai/en/stable/module_guides/workflow/

文章目录

    • 一、入门
      • 1、定义工作流事件
      • 2、设置工作流类
      • 3、工作流入口点
      • 4、Workflow Exit Points
      • 5、运行工作流
    • 二、绘制工作流程
    • 三、使用全局上下文/状态
    • 四、等待多个事件
    • 五、手动触发事件
    • 六、流媒体事件
    • 七、失败的重试步骤
    • 八、逐步执行
    • 九、装饰非类函数
    • 十、跨运行维护上下文
    • 十一、部署工作流
    • 十二、例子


LlamaIndex中的Workflow是一个事件驱动的抽象,用于将多个事件链接在一起。工作流由steps组成,每个步骤负责处理某些事件类型并发出新事件。

Workflow在LlamaIndex中的工作是通过使用@step装饰器装饰函数。这用于推断每个工作流的输入和输出类型以进行验证,并确保每个步骤仅在接受的事件准备就绪时运行。

您可以创建Workflow来做任何事情!构建代理、RAG流、提取流或任何您想要的东西。

工作流也被自动检测,因此您可以使用Arize Pheonix等工具将可观察性带入每个步骤。(**注意:**可观察性适用于利用较新的检测系统的集成。用法可能会有所不同。)

小费

工作流使async成为一等公民,本页假定您在异步环境中运行。这对您来说意味着正确设置异步代码。如果您已经在FastAPI等服务器或笔记本中运行,您可以自由使用wait了!

如果您正在运行自己的python脚本,最好的做法是有一个异步切入点。

async def main():w = MyWorkflow(...)result = await w.run(...)print(result)if __name__ == "__main__":import asyncioasyncio.run(main())

一、入门

作为一个说明性示例,让我们考虑一个简单的工作流程,其中生成一个笑话,然后进行批评。

from llama_index.core.workflow import (Event,StartEvent,StopEvent,Workflow,step,
)# `pip install llama-index-llms-openai` if you don't already have it
from llama_index.llms.openai import OpenAIclass JokeEvent(Event):joke: strclass JokeFlow(Workflow):llm = OpenAI()@stepasync def generate_joke(self, ev: StartEvent) -> JokeEvent:topic = ev.topicprompt = f"Write your best joke about {topic}."response = await self.llm.acomplete(prompt)return JokeEvent(joke=str(response))@stepasync def critique_joke(self, ev: JokeEvent) -> StopEvent:joke = ev.jokeprompt = f"Give a thorough analysis and critique of the following joke: {joke}"response = await self.llm.acomplete(prompt)return StopEvent(result=str(response))w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

这里有一些移动的部分,让我们一块一块地看。


1、定义工作流事件

class JokeEvent(Event):joke: str

事件是用户定义的pydantic对象。您可以控制属性和任何其他辅助方法。在这种情况下,我们的工作流依赖于单个用户定义的事件JokeEvent


2、设置工作流类

class JokeFlow(Workflow):llm = OpenAI(model="gpt-4o-mini")...

我们的工作流是通过子类化Workflow类来实现的。为了简单起见,我们附加了一个静态OpenAI实例。


3、工作流入口点

class JokeFlow(Workflow):...@stepasync def generate_joke(self, ev: StartEvent) -> JokeEvent:topic = ev.topicprompt = f"Write your best joke about {topic}."response = await self.llm.acomplete(prompt)return JokeEvent(joke=str(response))...

这里,我们来到工作流的入口点。虽然事件是使用定义的,但有两个特殊情况事件,StartEventStopEvent。在这里,StartEvent表示将初始工作流输入发送到哪里。

这个StartEvent是一个特殊的对象,因为它可以保存任意属性。在这里,我们使用ev.topic访问主题,如果它不存在,会引发错误。您也可以使用ev.get("topic")来处理属性可能不存在而不会引发错误的情况。

至此,您可能已经注意到,我们并没有明确告诉工作流哪些步骤处理哪些事件,而是使用@step装饰器来推断每个步骤的输入和输出类型,更进一步,这些推断的输入和输出类型也用于在运行前为您验证工作流是否有效!


4、Workflow Exit Points

class JokeFlow(Workflow):...@stepasync def critique_joke(self, ev: JokeEvent) -> StopEvent:joke = ev.jokeprompt = f"Give a thorough analysis and critique of the following joke: {joke}"response = await self.llm.acomplete(prompt)return StopEvent(result=str(response))...

在这里,我们有了工作流中的第二步,也是最后一步。我们知道这是最后一步,因为返回了特殊的StopEvent。当工作流遇到返回的StopEvent时,它会立即停止工作流并返回任何结果。

在这种情况下,结果是一个字符串,但它可以是字典、列表或任何其他对象。


5、运行工作流

w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

最后,我们创建并运行工作流。有一些设置,如超时(以秒为单位)和冗长来帮助调试。

..run()方法是异步的,所以我们在这里使用wait来等待结果。


二、绘制工作流程

可以使用步骤定义中类型注释的强大功能来可视化工作流。您可以绘制工作流中所有可能的路径,也可以绘制最近的执行,以帮助调试。

首次安装:

pip install llama-index-utils-workflow

然后导入并使用:

from llama_index.utils.workflow import (draw_all_possible_flows,draw_most_recent_execution,
)# Draw all
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")# Draw an execution
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")

三、使用全局上下文/状态

或者,您可以选择在步骤之间使用全局上下文。例如,可能多个步骤访问用户的原始query输入。您可以将其存储在全局上下文中,以便每个步骤都有访问权限。

from llama_index.core.workflow import Context@step
async def query(self, ctx: Context, ev: MyEvent) -> StopEvent:# retrieve from contextquery = await ctx.get("query")# do something with context and eventval = ...result = ...# store in contextawait ctx.set("key", val)return StopEvent(result=result)

四、等待多个事件

上下文不仅仅保存数据,它还提供实用程序来缓冲和等待多个事件。

例如,您可能有一个步骤在合成响应之前等待查询和检索到的节点:

from llama_index.core import get_response_synthesizer@step
async def synthesize(self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])# check if we can runif data is None:return None# unpack -- data is returned in orderquery_event, retrieve_event = data# run response synthesissynthesizer = get_response_synthesizer()response = synthesizer.synthesize(query_event.query, nodes=retrieve_event.nodes)return StopEvent(result=response)

使用ctx.collect_events()我们可以缓冲并等待所有预期的事件到达。这个函数只会在所有事件到达后返回数据(按请求的顺序)。


五、手动触发事件

通常,事件是通过在步骤中返回另一个事件来触发的。但是,也可以使用工作流中的ctx.send_event(event)方法手动调度事件。

这是一个简短的玩具示例,展示了如何使用它:

from llama_index.core.workflow import step, Context, Event, Workflowclass MyEvent(Event):passclass MyEventResult(Event):result: strclass GatherEvent(Event):passclass MyWorkflow(Workflow):@stepasync def dispatch_step(self, ctx: Context, ev: StartEvent) -> MyEvent | GatherEvent:ctx.send_event(MyEvent())ctx.send_event(MyEvent())return GatherEvent()@stepasync def handle_my_event(self, ev: MyEvent) -> MyEventResult:return MyEventResult(result="result")@stepasync def gather(self, ctx: Context, ev: GatherEvent | MyEventResult) -> StopEvent | None:# wait for events to finishevents = ctx.collect_events([MyEventResult, MyEventResult])if not events:return Nonereturn StopEvent(result=events)

六、流媒体事件

您还可以在事件进入时迭代它们。这对于流式传输、显示进度或调试很有用。

w = MyWorkflow(...)handler = w.run(topic="Pirates")async for event in handler.stream_events():print(event)result = await handler

七、失败的重试步骤

执行失败的步骤可能会导致整个工作流失败,但通常会出现错误,可以安全地重试执行。想象一下,由于网络暂时拥塞而超时的HTTP请求,或者遇到速率限制的外部API调用。

对于所有希望该步骤重试的情况,您可以使用“重试策略”。重试策略是一个对象,它指示工作流多次执行一个步骤,规定在新的尝试之前必须经过多长时间。策略考虑了自第一次故障以来经过了多长时间,连续发生了多少次故障,以及最后发生了哪个错误。

要为特定步骤设置策略,您所要做的就是将策略对象传递给 @step 装饰器:

from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicyclass MyWorkflow(Workflow):# ...more workflow definition...# This policy will retry this step on failure every 5 seconds for at most 10 times@step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=10))async def flaky_step(self, ctx: Context, ev: StartEvent) -> StopEvent:result = flaky_call()  # this might raisereturn StopEvent(result=result)

您可以查看API文档以获取策略的详细描述 框架中可用。如果您找不到适合您的用例的策略,您可以轻松编写 自定义策略的唯一要求是编写一个尊重RetryPolicy 协议。换句话说,您的自定义策略类必须具有具有以下签名的方法:

def next(self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:...

例如,这是一个重试策略,它对周末感到兴奋,并且仅在周五重试一个步骤:

from datetime import datetimeclass RetryOnFridayPolicy:def next(self, elapsed_time: float, attempts: int, error: Exception) -> Optional[float]:if datetime.today().strftime("%A") == "Friday":# retry in 5 secondsreturn 5# tell the workflow we don't want to retryreturn None

八、逐步执行

工作流具有用于逐步执行的内置实用程序,允许您随着事情的进展控制执行和调试状态。

w = JokeFlow(...)# Kick off the workflow
handler = w.run(topic="Pirates")# Iterate until done
async for _ in handler:# inspect context# val = await handler.ctx.get("key")continue# Get the final result
result = await handler

九、装饰非类函数

您还可以在不子类化的情况下将步骤装饰和附加到工作流。

下面是前面的JokeFlow,但没有定义子类。

from llama_index.core.workflow import (Event,StartEvent,StopEvent,Workflow,step,
)
from llama_index.llms.openai import OpenAIclass JokeEvent(Event):joke: strjoke_flow = Workflow(timeout=60, verbose=True)@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:topic = ev.topicprompt = f"Write your best joke about {topic}."llm = OpenAI()response = await llm.acomplete(prompt)return JokeEvent(joke=str(response))@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:joke = ev.jokeprompt = (f"Give a thorough analysis and critique of the following joke: {joke}")response = await llm.acomplete(prompt)return StopEvent(result=str(response))

十、跨运行维护上下文

如您所见,工作流有一个Context对象,可用于跨步骤维护状态。

如果要在工作流的多次运行中维护状态,可以将先前的上下文传递给.run()方法。

handler = w.run()
result = await handler# continue with next run
handler = w.run(ctx=handler.ctx)
result = await handler

十一、部署工作流

您可以将工作流部署为具有llama_deploy(repo)的多代理服务。每个代理服务都通过控制面进行编排,并通过消息队列进行通信。在本地或库伯内特斯上部署。


十二、例子

您可以在下面的笔记本中找到许多使用工作流的有用示例:

  • 高级文本SQL
  • 引文查询引擎
  • 常见的工作流模式
  • 校正RAG
  • 函数调用代理
  • 循环中的人类:故事制作
  • JSON查询引擎
  • 长RAG
  • 多步骤查询引擎
  • 多策略工作流程
  • RAG+重新排序
  • 反应代理
  • 可靠的结构化发电
  • 路由器查询引擎
  • 自我发现工作流程
  • 子问题查询引擎
  • 利用并发

2024-09-24(二)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/430933.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单片机带隙电压基准电路

单片机带隙电压基准电路 一、带隙电压基准电路概述 带隙电压基准电路在单片机中占据着至关重要的地位。它能够为各种模拟集成电路提供稳定的参考电压,确保电路的正常运行。例如,在高精度的比较器中,带隙电压基准电路可以提供一个精确的参考…

linux 的 sed 命令的 使用学习

(1) sed 概述: (2) 首先谢谢 b 站这位老师,这位专家的完美讲解 讲解继续: (3) 关于 sed 里的模式: (4) sed 支持的常用的对文本编辑的…

Matlab|考虑柔性负荷的综合能源系统低碳经济优化调度

目录 1 主要内容 2 部分代码 3 程序结果 4 下载链接 1 主要内容 程序主要实现的是考虑柔性负荷的综合能源系统低碳经济优化调度,模型参考《考虑柔性负荷的综合能源系统低碳经济优化调度》,求解方法采用的是混合整数规划算法,通过matlabc…

【设计模式】UML类图

目录 前言 一、类图概述 二、类图的作用 三、类图表示法 四、类之间关系的表示方法 1. 关联关系 1.1 单向关联 1.2 双向关联 1.3 自关联 2. 聚合关系 3. 组合关系 4. 依赖关系 5. 继承关系 6. 实现关系 总结 前言 统一建模语言( Unified Modeling La…

如何快速上手一个Github的开源项目

程序研发领域正是有一些热衷开源的小伙伴,技能迭代才能如此的迅速,因此,快速上手一个GitHub上的开源项目,基本上已经变成很个程序员小伙伴必须掌握的技能,因为终究你会应用到其中的一个或多个项目,帮助自己…

【资源一号04A卫星(中巴地球资源卫星04A星)】

资源一号04A卫星(中巴地球资源卫星04A星) 资源一号04A卫星,全称为中巴地球资源卫星04A星(CBERS-04A),是中国与巴西两国合作研制的第六颗地球资源卫星。以下是对该卫星的详细介绍: 一、基本信…

打造灵活DateTimePicker日期时间选择器组件:轻松实现时间的独立清除功能

element ui中日期和时间选择器(DateTimePicker)是一个常见且重要的组件。它允许用户轻松地选择日期和时间,极大地提升了用户体验。然而,在某些场景下,用户可能需要更细粒度的控制,例如单独清除已选择的时间…

【资源一号02C卫星】

资源一号02C卫星 资源一号02C卫星是中国航天科技集团公司所属中国空间技术研究院负责研制生产的一颗重要遥感卫星。以下是关于该卫星的详细介绍: 一、基本信息 发射时间:2011年12月22日11时26分发射地点:中国太原卫星发射中心运载火箭&am…

基于区块链的相亲交易系统源码解析

随着区块链技术的成熟与发展,其去中心化、不可篡改的特性逐渐被应用于各行各业。特别是在婚恋市场中,区块链技术的应用为相亲平台带来了新的可能性 。本文将探讨如何利用区块链技术构建一个透明、高效的相亲交易系统,并提供部分源码示例。 区…

提前解锁 Vue 3.5 的新特性

Vue 3.5 是 Vue.js 新发布的版本,虽然没有引入重大变更,但带来了许多实用的增强功能、内部优化和性能改进。 1. 响应式系统优化 Vue 3.5 进一步优化了响应式系统的性能,并且减少内存占用。尤其在处理大型或深度嵌套的响应式数组时&#xff…

Contact Form 7最新5.9.8版错误修复方案

最近有多位用户反应Contact Form 7最新5.9.8版的管理页面有错误如下图所示 具体错误文件的路径为wp-content\plugins\contact-form-7\admin\includes\welcome-panel.php on line 153 找到welcome-panel.php这个文件编辑它,将如下图选中的部分删除 删除以后&#xf…

显示和隐藏图片【JavaScript】

使用 JavaScript 来实现显示和隐藏图片。下面是一个简单的示例&#xff0c;展示如何通过按钮点击来切换图片的可见性。 实现效果: 代码&#xff1a; <!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta name&…

python爬虫案例——抓取链家租房信息

文章目录 1、任务目标2、分析网页3、编写代码1、任务目标 目标站点:链家租房版块(https://bj.lianjia.com/zufang/) 要求:抓取该链接下前5页所有的租房信息,包括:标题、详情信息、详情链接、价格 如: 2、分析网页 用浏览器打开链接,按F12或右键检查,进入开发者模式;因…

计算机毕业设计 美发管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

OpenAI converting API code from GPT-3 to chatGPT-3.5

题意&#xff1a;将OpenAI API代码从GPT-3转换为ChatGPT-3.5 问题背景&#xff1a; Below is my working code for the GPT-3 API. I am having trouble converting it to work with chatGPT-3.5. 以下是我用于GPT-3 API的工作代码。我在将其转换为适用于ChatGPT-3.5时遇到了…

前端开发之装饰器模式

介绍 装饰器模式 是在不修改对象内部结构的情况下&#xff0c;动态地给对象添加功能的一种设计模式。在软件开发中&#xff0c;有时候我们需要为已有对象添加一些额外的行为&#xff0c;但不希望修改该对象的代码&#xff0c;装饰器模式可以很好的满足这一需求。 在TypeScrip…

echarts map地图动态下钻,自定义标注,自定义tooltip弹窗【完整demo版本】

在数据可视化中&#xff0c;地图是很重要的一个环节&#xff0c;很多时候需要展现的不仅是国家地图&#xff0c;还需要能从国家进入到省市。这个逐级进入的过程就是我们今天说的地图下钻。 地图下钻看起来很屌、很高大上&#xff0c;但是仔细琢磨一下&#xff0c;技术实现上真的…

Cpp类和对象(下)(6)

文章目录 前言一、初始化列表概念使用注意实际运用explicit关键字初始化列表的总结 二、static成员static成员的概念static成员的特性static的一个实用场景 三、友元友元函数友元类 四、内部类概念特性 五、匿名对象六、再次理解封装和面向对象总结 前言 Hello&#xff0c;本篇…

『玉竹』基于Laravel 开发的博客、微博客系统和Android App

基于 Laravel 和 Filament 开发, 使用 Filament 开发管理后台&#xff0c;前端比较简洁。 博客大家都清楚是什么东西&#xff0c;微博客类似于微博之类的吧&#xff0c;有时候想要写的东西可能只有几句话&#xff0c;想要起个标题都不好起。 为了是微博客功能更好用&#xff0c…

【小程序】微信小程序课程 -3 快速上手之常用方法

目录 1、 对话框 1.1 模态对话框 1.2 消息对话框 2、 存储 2.1 同步 2.1.1 同步保存数据 2.1.2 同步获取数据 2.1.3 同步删除数据 2.1.4 同步清空数据 2.2 异步 2.2.1 异步保存数据 2.2.2 异步获取数据 2.2.3 异步删除数据 2.2.4 异步清空数据 3、 上拉加载更多…