Autogen_core源码:_agent_runtime.py

目录

    • _agent_runtime.py代码
    • 代码解释
      • 代码逻辑概述
      • 具体功能解释
        • 1. 发送消息
        • 2. 发布消息
        • 3. 注册代理工厂
        • 4. 获取底层代理实例
        • 5. 获取代理ID
        • 6. 保存和加载运行时状态
        • 7. 获取代理元数据
        • 8. 保存和加载单个代理状态
        • 9. 添加和移除订阅
        • 10. 添加消息序列化器
      • 总结
    • 代码示例
      • 示例 1:发送消息
      • 示例 2:发布消息
      • 示例 3:注册代理工厂
      • 示例 4:获取代理实例
      • 示例 5:保存和加载运行时状态
      • 示例 6:添加和移除订阅
      • 示例 7:添加消息序列化器

_agent_runtime.py代码

from __future__ import annotationsfrom collections.abc import Sequence
from typing import Any, Awaitable, Callable, Mapping, Protocol, Type, TypeVar, overload, runtime_checkablefrom ._agent import Agent
from ._agent_id import AgentId
from ._agent_metadata import AgentMetadata
from ._agent_type import AgentType
from ._cancellation_token import CancellationToken
from ._serialization import MessageSerializer
from ._subscription import Subscription
from ._topic import TopicId# Undeliverable - errorT = TypeVar("T", bound=Agent)@runtime_checkable
class AgentRuntime(Protocol):async def send_message(self,message: Any,recipient: AgentId,*,sender: AgentId | None = None,cancellation_token: CancellationToken | None = None,message_id: str | None = None,) -> Any:"""Send a message to an agent and get a response.Args:message (Any): The message to send.recipient (AgentId): The agent to send the message to.sender (AgentId | None, optional): Agent which sent the message. Should **only** be None if this was sent from no agent, such as directly to the runtime externally. Defaults to None.cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None.Raises:CantHandleException: If the recipient cannot handle the message.UndeliverableException: If the message cannot be delivered.Other: Any other exception raised by the recipient.Returns:Any: The response from the agent."""...async def publish_message(self,message: Any,topic_id: TopicId,*,sender: AgentId | None = None,cancellation_token: CancellationToken | None = None,message_id: str | None = None,) -> None:"""Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.No responses are expected from publishing.Args:message (Any): The message to publish.topic (TopicId): The topic to publish the message to.sender (AgentId | None, optional): The agent which sent the message. Defaults to None.cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress. Defaults to None.message_id (str | None, optional): The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.Raises:UndeliverableException: If the message cannot be delivered."""...async def register_factory(self,type: str | AgentType,agent_factory: Callable[[], T | Awaitable[T]],*,expected_class: type[T] | None = None,) -> AgentType:"""Register an agent factory with the runtime associated with a specific type. The type must be unique. This API does not add any subscriptions... note::This is a low level API and usually the agent class's `register` method should be used instead, as this also handles subscriptions automatically.Example:.. code-block:: pythonfrom dataclasses import dataclassfrom autogen_core import AgentRuntime, MessageContext, RoutedAgent, eventfrom autogen_core.models import UserMessage@dataclassclass MyMessage:content: strclass MyAgent(RoutedAgent):def __init__(self) -> None:super().__init__("My core agent")@eventasync def handler(self, message: UserMessage, context: MessageContext) -> None:print("Event received: ", message.content)async def my_agent_factory():return MyAgent()async def main() -> None:runtime: AgentRuntime = ...  # type: ignoreawait runtime.register_factory("my_agent", lambda: MyAgent())import asyncioasyncio.run(main())Args:type (str): The type of agent this factory creates. It is not the same as agent class name. The `type` parameter is used to differentiate between different factory functions rather than agent classes.agent_factory (Callable[[], T]): The factory that creates the agent, where T is a concrete Agent type. Inside the factory, use `autogen_core.AgentInstantiationContext` to access variables like the current runtime and agent ID.expected_class (type[T] | None, optional): The expected class of the agent, used for runtime validation of the factory. Defaults to None."""...# TODO: uncomment out the following type ignore when this is fixed in mypy: https://github.com/python/mypy/issues/3737async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = Agent) -> T:  # type: ignore[assignment]"""Try to get the underlying agent instance by name and namespace. This is generally discouraged (hence the long name), but can be useful in some cases.If the underlying agent is not accessible, this will raise an exception.Args:id (AgentId): The agent id.type (Type[T], optional): The expected type of the agent. Defaults to Agent.Returns:T: The concrete agent instance.Raises:LookupError: If the agent is not found.NotAccessibleError: If the agent is not accessible, for example if it is located remotely.TypeError: If the agent is not of the expected type."""...@overloadasync def get(self, id: AgentId, /, *, lazy: bool = ...) -> AgentId: ...@overloadasync def get(self, type: AgentType | str, /, key: str = ..., *, lazy: bool = ...) -> AgentId: ...async def get(self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True) -> AgentId: ...async def save_state(self) -> Mapping[str, Any]:"""Save the state of the entire runtime, including all hosted agents. The only way to restore the state is to pass it to :meth:`load_state`.The structure of the state is implementation defined and can be any JSON serializable object.Returns:Mapping[str, Any]: The saved state."""...async def load_state(self, state: Mapping[str, Any]) -> None:"""Load the state of the entire runtime, including all hosted agents. The state should be the same as the one returned by :meth:`save_state`.Args:state (Mapping[str, Any]): The saved state."""...async def agent_metadata(self, agent: AgentId) -> AgentMetadata:"""Get the metadata for an agent.Args:agent (AgentId): The agent id.Returns:AgentMetadata: The agent metadata."""...async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]:"""Save the state of a single agent.The structure of the state is implementation defined and can be any JSON serializable object.Args:agent (AgentId): The agent id.Returns:Mapping[str, Any]: The saved state."""...async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None:"""Load the state of a single agent.Args:agent (AgentId): The agent id.state (Mapping[str, Any]): The saved state."""...async def add_subscription(self, subscription: Subscription) -> None:"""Add a new subscription that the runtime should fulfill when processing published messagesArgs:subscription (Subscription): The subscription to add"""...async def remove_subscription(self, id: str) -> None:"""Remove a subscription from the runtimeArgs:id (str): id of the subscription to removeRaises:LookupError: If the subscription does not exist"""...def add_message_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None:"""Add a new message serialization serializer to the runtimeNote: This will deduplicate serializers based on the type_name and data_content_type propertiesArgs:serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]): The serializer/s to add"""...

代码解释

这段Python代码定义了一个名为AgentRuntime的协议(Protocol),该协议描述了一个代理运行时环境应具备的功能接口。下面详细解释代码逻辑和完成的功能:

代码逻辑概述

  1. 导入模块:导入了一系列必要的模块和类型,包括collections.abc中的Sequencetyping模块中的各种类型注解,以及自定义的一些模块如_agent_agent_id等。
  2. 定义类型变量:定义了一个类型变量T,它必须是Agent类或其子类。
  3. 定义AgentRuntime协议:使用@runtime_checkable装饰器定义了一个AgentRuntime协议,该协议包含了多个异步方法,这些方法描述了代理运行时环境应具备的功能。

具体功能解释

1. 发送消息
async def send_message(self,message: Any,recipient: AgentId,*,sender: AgentId | None = None,cancellation_token: CancellationToken | None = None,message_id: str | None = None,
) -> Any:...
  • 功能:向指定的代理发送消息并获取响应。
  • 参数
    • message:要发送的消息。
    • recipient:消息的接收代理的ID。
    • sender:消息的发送代理的ID,若为None表示消息是从外部直接发送到运行时的。
    • cancellation_token:用于取消正在进行的操作的令牌。
    • message_id:消息的唯一ID,若为None则会生成一个新的ID。
  • 异常:可能会抛出CantHandleException(接收者无法处理消息)、UndeliverableException(消息无法送达)等异常。
  • 返回值:接收代理的响应。
2. 发布消息
async def publish_message(self,message: Any,topic_id: TopicId,*,sender: AgentId | None = None,cancellation_token: CancellationToken | None = None,message_id: str | None = None,
) -> None:...
  • 功能:将消息发布到指定的主题,不期望得到响应。
  • 参数
    • message:要发布的消息。
    • topic_id:消息要发布到的主题ID。
    • sender:消息的发送代理的ID,默认为None
    • cancellation_token:用于取消正在进行的操作的令牌。
    • message_id:消息的唯一ID,若为None则会生成一个新的ID。
  • 异常:可能会抛出UndeliverableException(消息无法送达)异常。
3. 注册代理工厂
async def register_factory(self,type: str | AgentType,agent_factory: Callable[[], T | Awaitable[T]],*,expected_class: type[T] | None = None,
) -> AgentType:...
  • 功能:向运行时注册一个代理工厂,该工厂与特定类型关联,类型必须唯一。
  • 参数
    • type:代理的类型,可以是字符串或AgentType对象。
    • agent_factory:创建代理的工厂函数,返回一个代理实例或一个可等待的代理实例。
    • expected_class:代理的预期类,用于运行时验证工厂。
  • 返回值:注册的代理类型。
4. 获取底层代理实例
async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = Agent) -> T:...
  • 功能:尝试根据代理ID获取底层代理实例。
  • 参数
    • id:代理的ID。
    • type:代理的预期类型,默认为Agent
  • 异常:可能会抛出LookupError(代理未找到)、NotAccessibleError(代理不可访问)、TypeError(代理类型不符合预期)等异常。
  • 返回值:具体的代理实例。
5. 获取代理ID
@overload
async def get(self, id: AgentId, /, *, lazy: bool =...) -> AgentId:...@overload
async def get(self, type: AgentType | str, /, key: str =..., *, lazy: bool =...) -> AgentId:...async def get(self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True
) -> AgentId:...
  • 功能:根据代理ID或类型获取代理ID。
  • 参数
    • id_or_type:可以是代理ID、代理类型或类型字符串。
    • key:键,默认为"default"
    • lazy:是否懒加载,默认为True
  • 返回值:代理ID。
6. 保存和加载运行时状态
async def save_state(self) -> Mapping[str, Any]:...async def load_state(self, state: Mapping[str, Any]) -> None:...
  • save_state功能:保存整个运行时的状态,包括所有托管代理的状态。
  • 返回值:保存的状态,是一个JSON可序列化的对象。
  • load_state功能:加载之前保存的运行时状态。
  • 参数state为之前保存的状态。
7. 获取代理元数据
async def agent_metadata(self, agent: AgentId) -> AgentMetadata:...
  • 功能:获取指定代理的元数据。
  • 参数agent为代理的ID。
  • 返回值:代理的元数据。
8. 保存和加载单个代理状态
async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]:...async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None:...
  • agent_save_state功能:保存单个代理的状态。
  • 参数agent为代理的ID。
  • 返回值:保存的代理状态,是一个JSON可序列化的对象。
  • agent_load_state功能:加载单个代理的状态。
  • 参数agent为代理的ID,state为之前保存的代理状态。
9. 添加和移除订阅
async def add_subscription(self, subscription: Subscription) -> None:...async def remove_subscription(self, id: str) -> None:...
  • add_subscription功能:向运行时添加一个新的订阅,运行时在处理发布的消息时会满足该订阅。
  • 参数subscription为要添加的订阅。
  • remove_subscription功能:从运行时移除一个订阅。
  • 参数id为要移除的订阅的ID。
  • 异常:若订阅不存在,会抛出LookupError异常。
10. 添加消息序列化器
def add_message_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None:...
  • 功能:向运行时添加一个或多个消息序列化器,会根据type_namedata_content_type属性对序列化器进行去重。
  • 参数serializer为要添加的序列化器或序列化器序列。

总结

AgentRuntime协议定义了一个代理运行时环境的接口,包括消息发送和发布、代理工厂注册、状态保存和加载、订阅管理以及消息序列化器管理等功能,为实现具体的代理运行时环境提供了规范。

代码示例

示例 1:发送消息

import asyncio
from autogen_core import AgentRuntime,Agent, AgentId, TopicId,SingleThreadedAgentRuntime, Subscriptionasync def send_message_example(runtime: AgentRuntime):recipient = AgentId("recipient_agent", "default")message = {"content": "Hello, recipient!"}try:response = await runtime.send_message(message, recipient)print("Response received:", response)except Exception as e:print(f"Error sending message: {e}")async def main():runtime: AgentRuntime = SingleThreadedAgentRuntime()  await send_message_example(runtime)await main()
Error sending message: Recipient not found

示例 2:发布消息

async def publish_message_example(runtime: AgentRuntime):topic = TopicId("example_topic","default")message = {"content": "This is a published message!"}sender = AgentId("sender_agent", "default")try:await runtime.publish_message(message, topic, sender=sender)print("Message published successfully.")except Exception as e:print(f"Error publishing message: {e}")async def main():runtime: AgentRuntime =SingleThreadedAgentRuntime()  await publish_message_example(runtime)await main()
Message published successfully.

示例 3:注册代理工厂

class MyAgent(Agent):def __init__(self):super().__init__("My Agent")async def my_agent_factory():return MyAgent()async def register_factory_example(runtime: AgentRuntime):agent_type = await runtime.register_factory("my_agent_type", my_agent_factory)print(f"Agent factory registered for type: {agent_type}")async def main():runtime: AgentRuntime =SingleThreadedAgentRuntime()await register_factory_example(runtime)await main()
Agent factory registered for type: AgentType(type='my_agent_type')

示例 4:获取代理实例

async def get_agent_instance_example(runtime: AgentRuntime):agent_id = AgentId("my_agent", "default")try:agent = await runtime.try_get_underlying_agent_instance(agent_id, Agent)print(f"Agent instance retrieved: {agent}")except Exception as e:print(f"Error getting agent instance: {e}")async def main():runtime: AgentRuntime =SingleThreadedAgentRuntime()await get_agent_instance_example(runtime)await main()
Error getting agent instance: Agent with name my_agent not found.

示例 5:保存和加载运行时状态

async def save_load_state_example(runtime: AgentRuntime):# 保存状态state = await runtime.save_state()print("Runtime state saved.")# 加载状态await runtime.load_state(state)print("Runtime state loaded.")async def main():runtime: AgentRuntime =SingleThreadedAgentRuntime()await save_load_state_example(runtime)await main()
Runtime state saved.
Runtime state loaded.

示例 6:添加和移除订阅

async def subscription_example(runtime: AgentRuntime):from autogen_core import TypeSubscriptionsubscription = TypeSubscription(topic_type="t1", agent_type="a1")# 添加订阅await runtime.add_subscription(subscription)print("Subscription added.")# 移除订阅try:await runtime.remove_subscription(subscription.id)print("Subscription removed.")except Exception as e:print(f"Error removing subscription: {e}")async def main():runtime: AgentRuntime =SingleThreadedAgentRuntime()  # 假设这里已经有一个 AgentRuntime 实例await subscription_example(runtime)await main()
Subscription added.
Subscription removed.

示例 7:添加消息序列化器

from autogen_core import AgentRuntime, MessageSerializer
from typing import Anyclass MyMessageSerializer(MessageSerializer[Any]):def serialize(self, message: Any) -> bytes:# 实现序列化逻辑passdef deserialize(self, data: bytes) -> Any:# 实现反序列化逻辑passruntime: AgentRuntime =SingleThreadedAgentRuntime()  
serializer = MyMessageSerializer()
runtime.add_message_serializer(serializer)
print("Message serializer added.")
Message serializer added.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/10345.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python-leetcode-旋转链表

61. 旋转链表 - 力扣(LeetCode) # Definition for singly-linked list. # class ListNode: # def __init__(self, val0, nextNone): # self.val val # self.next next class Solution:def rotateRight(self, head: Optional[ListN…

基于排队理论的物联网发布/订阅通信系统建模与优化

论文标题 英文标题:Queuing Theory-Based Modeling and Optimization of a Publish/Subscribe IoT Communication System 中文标题:基于排队理论的物联网发布/订阅通信系统建模与优化 作者信息 Franc Pouhela Anthony Kiggundu Hans D. Schotten …

[EAI-026] DeepSeek-VL2 技术报告解读

Paper Card 论文标题:DeepSeek-VL2: Mixture-of-Experts Vision-Language Models for Advanced Multimodal Understanding 论文作者:Zhiyu Wu, Xiaokang Chen, Zizheng Pan, Xingchao Liu, Wen Liu, Damai Dai, Huazuo Gao, Yiyang Ma, Chengyue Wu, Bin…

基于Python的药物相互作用预测模型AI构建与优化(下.代码部分)

四、特征工程 4.1 分子描述符计算 分子描述符作为量化分子性质的关键数值,能够从多维度反映药物分子的结构和化学特征,在药物相互作用预测中起着举足轻重的作用。RDKit 库凭借其强大的功能,为我们提供了丰富的分子描述符计算方法,涵盖了多个重要方面的分子性质。 分子量…

【算法】动态规划专题① ——线性DP python

目录 引入简单实现稍加变形举一反三实战演练总结 引入 楼梯有个台阶,每次可以一步上1阶或2阶。一共有多少种不同的上楼方法? 怎么去思考? 假设就只有1个台阶,走法只有:1 只有2台阶: 11,2 只有3台…

0 基础学运维:解锁 K8s 云计算运维工程师成长密码

前言:作为一个过来人,我曾站在技术的门槛之外,连电脑运行内存和内存空间都傻傻分不清,完完全全的零基础。但如今,我已成长为一名资深的k8s云计算运维工程师。回顾这段历程,我深知踏上这条技术之路的艰辛与不…

事务03之MVCC机制

MVCC 多版本并发控制机制 文章目录 MVCC 多版本并发控制机制一:并发事务的场景1:读读场景2:写写场景3:读写 or 写读场景 二:MVCC机制综述1:MVCC日常生活的体现2:多版本并发控制 三:M…

数据结构-Stack和栈

1.栈 1.1什么是栈 栈是一种特殊的线性表,只允许在固定的一段进行插入和删除操作,进行插入和删除操作的一段称为栈顶,另一端称为栈底。 栈中的数据元素遵顼后进先出LIFO(Last In First Out)的原则,就像一…

langchain基础(二)

一、输出解析器(Output Parser) 作用:(1)让模型按照指定的格式输出; (2)解析模型输出,提取所需的信息 1、逗号分隔列表 CommaSeparatedListOutputParser:…

AI编程:如何编写提示词

这是小卷对AI编程工具学习的第2篇文章,今天讲讲如何编写AI编程的提示词,并结合实际功能需求案例来进行开发 1.编写提示词的技巧 好的提示词应该是:目标清晰明确,具有针对性,能引导模型理解问题 下面是两条提示词的对…

【B站保姆级视频教程:Jetson配置YOLOv11环境(五)Miniconda安装与配置】

B站同步视频教程:https://www.bilibili.com/video/BV1MwFDeyEYC/ 文章目录 0. Anaconda vs Miniconda in Jetson1. 下载Miniconda32. 安装Miniconda33. 换源3.1 conda 换源3.2 pip 换源 4. 创建环境5. 设置默认启动环境 0. Anaconda vs Miniconda in Jetson Jetson…

仿真设计|基于51单片机的无线投票系统仿真

目录 具体实现功能 设计介绍 51单片机简介 资料内容 仿真实现(protues8.7) 程序(Keil5) 全部内容 资料获取 具体实现功能 (1)投票系统分为发送端和接收端。 (2)发送端通过按…

玩转大语言模型——使用langchain和Ollama本地部署大语言模型

系列文章目录 玩转大语言模型——使用langchain和Ollama本地部署大语言模型 玩转大语言模型——ollama导入huggingface下载的模型 玩转大语言模型——langchain调用ollama视觉多模态语言模型 玩转大语言模型——使用GraphRAGOllama构建知识图谱 玩转大语言模型——完美解决Gra…

(动态规划基础 打家劫舍)leetcode 198

已知h2和h1&#xff0c;用已知推出未知 推是求答案&#xff0c;回溯是给答案 这里图片给出dfs暴力&#xff0c;再进行记录答案完成记忆化搜索&#xff0c;再转为dp数组 #include<iostream> #include<vector> #include<algorithm> //nums:2,1,1,2 //dp:2,2,…

origin如何在已经画好的图上修改数据且不改变原图像的画风和格式

例如我现在的.opju文件长这样 现在我换了数据集&#xff0c;我想修改这两个图表里对应的算法里的数据&#xff0c;但是我还想保留这图像现在的形式&#xff0c;可以尝试像下面这样做&#xff1a; 右击第一个图&#xff0c;出现下面&#xff0c;选择Book[sheet1] 选择工作簿 出…

[STM32 - 野火] - - - 固件库学习笔记 - - -十二.基本定时器

一、定时器简介 STM32 中的定时器&#xff08;TIM&#xff0c;Timer&#xff09;是其最重要的外设之一&#xff0c;广泛用于时间管理、事件计数和控制等应用。 1.1 基本功能 定时功能&#xff1a;TIM定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中…

Python从0到100(八十六):神经网络-ShuffleNet通道混合轻量级网络的深入介绍

前言&#xff1a; 零基础学Python&#xff1a;Python从0到100最新最全教程。 想做这件事情很久了&#xff0c;这次我更新了自己所写过的所有博客&#xff0c;汇集成了Python从0到100&#xff0c;共一百节课&#xff0c;帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

04树 + 堆 + 优先队列 + 图(D1_树(D1_基本介绍))

目录 一、什么是树&#xff1f; 二、相关术语 根结点 边 叶子结点 兄弟结点 祖先结点 结点的大小 树的层 结点的深度 结点的高度 树的高度 斜树 一、什么是树&#xff1f; 树是一种类似于链表的数据结构&#xff0c;不过链表的结点是以线性方式简单地指向其后继结…

Rust语言进阶之文件处理:std::fs用法实例(九十九)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

flowable expression和json字符串中的双引号内容

前言 最近做项目&#xff0c;发现了一批特殊的数据&#xff0c;即特殊字符"&#xff0c;本身输入双引号也不是什么特殊的字符&#xff0c;毕竟在存储时就是正常字符&#xff0c;只不过在编码的时候需要转义&#xff0c;转义符是\&#xff0c;然而转义符\也是特殊字符&…