【AI Infra】【RLHF框架】二、VeRL中colocate实现解析

​ colocate的作用是使多个Worker共享相同的资源池。当然,目前verl中所有模型的Worker都共享相同的资源池:global_pool。这篇博客主要通过例子和源代码理解verl中colocate的实现,需要一些前置知识。建议先阅读

【AI Infra】【RLHF框架】一、VeRL中基于Ray的执行流程源码解析

一、一个例子

​ 这里简单修改了verl的单元测试作为示例,先直观感受下colocate的作用。

import rayfrom verl.single_controller.base import Worker
from verl.single_controller.base.decorator import register, Dispatch
from verl.single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup, create_colocated_worker_clsfrom verl import DataProto@ray.remote
class Actor(Worker):def __init__(self) -> None:super().__init__()@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)def add(self, data: DataProto):data.batch['a'] = data.batch['a'].to("cuda")data.batch['a'] += self.rankreturn data@ray.remote
class Critic(Worker):def __init__(self, config) -> None:super().__init__()self.config = config@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)def sub(self, data: DataProto):data.batch['a'] = data.batch['a'].to("cuda")data.batch['a'] -= self.config['b']return datadef test_colocated_workers():ray.init()import torch# 构建一个DataProto,其中属性a是维度为10的零向量。data = DataProto.from_dict({'a': torch.zeros(10)})print(data.batch["a"])# 利用RayClassWithInitArgs将自定义的worker和参数封装起来actor_cls = RayClassWithInitArgs(cls=Actor)critic_cls = RayClassWithInitArgs(cls=Critic, config={'b': 10})# 定义资源池,仅包含一个2GPU的节点resource_pool = RayResourcePool(process_on_nodes=[2])# 利用create_colocated_worker_cls将自定义的两个worker绑定到WorkerDict上cls_dict = {'actor': actor_cls, 'critic': critic_cls}ray_cls_with_init = create_colocated_worker_cls(cls_dict)# 启动WorkerDictwg_dict = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init)# 分别获取actor和critic的workergroupspawn_wg = wg_dict.spawn(prefix_set=cls_dict.keys())colocated_actor_wg = spawn_wg['actor']colocated_critic_wg = spawn_wg['critic']# actor执行add、critic执行subactor_output = colocated_actor_wg.add(data)critic_output = colocated_critic_wg.sub(data)# actor_output.batch["a"]==[0, 0, 0, 0, 0, 1, 1, 1, 1, 1]# critic_output.batch["a"]==[-10, -10, -10, -10, -10, -10, -10, -10, -10, -10]print(actor_output.batch["a"])print(critic_output.batch["a"])ray.shutdown()if __name__ == '__main__':test_colocated_workers()

1. Actor和Critic的解释

ActorCritic的定义比较简单,在addsub方法上使用了装饰器@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)。这个装饰器的作用就是数据并行,即将数据chunk后交由定义worker的两个实例分别计算后,再将结果合并。下面是Actor执行过程的示意图,Crtic类似:
在这里插入图片描述

2. Actor和Critic绑定至WorkerDict

​ 通过打印一些元信息,可知ray_cls_with_initRayClassWithInitArgs,其中持有cls是一个定义为WorkerDict的类,其基类同样是Worker

cls_dict = {'actor': actor_cls, 'critic': critic_cls}
ray_cls_with_init = create_colocated_worker_cls(cls_dict)
print(type(ray_cls_with_init)) # RayClassWithInitArgs
print(ray_cls_with_init.cls.__ray_actor_class__) # WorkerDict
print(ray_cls_with_init.cls.__ray_actor_class__.__base__) # Worker
print(ray_cls_with_init.cls.actor_add)
print(ray_cls_with_init.cls.critic_sub)

在这里插入图片描述

3. 启动WorkerDict并执行操作

# 启动WorkerDict
wg_dict = RayWorkerGroup(resource_pool=resource_pool,ray_cls_with_init=ray_cls_with_init)
spawn_wg = wg_dict.spawn(prefix_set=cls_dict.keys())
# 获得actor和critic的RayWorkerGroup
colocated_actor_wg = spawn_wg['actor']
colocated_critic_wg = spawn_wg['critic']
# 执行
actor_output = colocated_actor_wg.add(data)
critic_output = colocated_critic_wg.sub(data)

​ 正如上一篇文章介绍,通过RayWorkerGroup启动WorkerDictspawn方法会返回一个RayWorkerGroup的字典,在这个例子中spawn_wg的值为:

{'actor': <verl.single_controller.ray.base.RayWorkerGroup object at 0x7f2efc719790>, 'critic': <verl.single_controller.ray.base.RayWorkerGroup object at 0x7f2f0e125dd0>}

​ 获得actorcriticRayWorkerGroup后直接执行操作即可。

二、create_colocated_worker_cls源码解析

​ 先了看一下稍微简化了一些的源码:

# 原始代码位于verl/single_controller/ray/base.py
def create_colocated_worker_cls(class_dict: dict[str, RayClassWithInitArgs]):# {"actor": Actor, "critic": Critic}cls_dict = {}# {'actor': {'args': (...), 'kwargs': {}}, 'critic': {'args': (...), 'kwargs': {...}}}init_args_dict = {}# worker_cls是指Actor和Critic,其实就是Workerworker_cls = Nonefor key, cls in class_dict.items():worker_cls = cls.cls.__ray_actor_class__.__base__cls_dict[key] = cls.clsinit_args_dict[key] = {'args': cls.args, 'kwargs': cls.kwargs}class WorkerDict(worker_cls):def __init__(self):super().__init__()self.worker_dict = {}for key, user_defined_cls in cls_dict.items():# 去除掉ray.remote的包装,这里user_defined_cls就是Actor和Critc两个类user_defined_cls = _unwrap_ray_remote(user_defined_cls)with patch.dict(os.environ, {'DISABLE_WORKER_INIT': '1'}):self.worker_dict[key] = user_defined_cls(*init_args_dict[key].get('args', ()), **init_args_dict[key].get('kwargs', {}))for key, user_defined_cls in cls_dict.items():user_defined_cls = _unwrap_ray_remote(user_defined_cls)_bind_workers_method_to_parent(WorkerDict, key, user_defined_cls)remote_cls = ray.remote(WorkerDict)remote_cls = RayClassWithInitArgs(cls=remote_cls)return remote_cls

1. 类WorkerDict

​ 可以看到WorkerDict是定义在create_colocated_worker_cls内部的类,其初始化方法__init__中核心就是构建self.worker_dict。在本文例子中,key就是actorcritic,对应的value就是ActorCritic的实例。

2. _bind_workers_method_to_parent

​ 一句话来说明这个函数的功能:将user_defined_cls中使用装饰器register的方法绑定到WorkerDictkey是方法绑定至WorkerDict的前缀。下面是源码:

# 原始代码位于verl/single_controller/ray/base.py
def _bind_workers_method_to_parent(cls, key, user_defined_cls):for method_name in dir(user_defined_cls):if hasattr(method, MAGIC_ATTR):# 遍历user_defined_cls的所有方法,找到使用装饰器`register`装饰的方法def generate_function(name):def func(self, *args, **kwargs):# dispatch to the actual workerreturn getattr(self.worker_dict[key], name)(*args, **kwargs)return funcfunc = generate_function(method_name)# 将原始函数`add`和`sub`的MAGIC_ATTR绑定到func上setattr(func, MAGIC_ATTR, getattr(method, MAGIC_ATTR))try:method_name_with_prefix = key + '_' + method_namesetattr(cls, method_name_with_prefix, func)except Exception as e:raise ValueError(f'Fail to set method_name {method_name}')

_bind_workers_method_to_parent实现可能比较难理解一些,先跳过generate_function

setattr(func, MAGIC_ATTR, getattr(method, MAGIC_ATTR))这行代码主要是将add或者sub的装饰器提供的信息复制到func上。

method_name_with_prefix = key + '_' + method_name
setattr(cls, method_name_with_prefix, func)

这段代码就是将Actor的方法add或者Critic的方法sub绑定至WorkerDict上。

​ 想理解generate_function的功能,最好直接看通过最终组装出的WorkerDict类。这里展示add绑定到WorkerDict的等价代码:

class WorkerDict(worker_cls):def __init__(self):...def actor_add(self, *args, **kwargs):return getattr(self.worker_dict["actor"], "add")(*args, **kwargs)

三、spawn的作用

​ 回顾一下开始例子中的这段代码:

wg_dict = RayWorkerGroup(resource_pool=resource_pool,ray_cls_with_init=ray_cls_with_init)spawn_wg = wg_dict.spawn(prefix_set=cls_dict.keys())
print(spawn_wg["actor"].workers)
print(spawn_wg["critic"].workers)

wg_dict = RayWorkerGroup(resource_pool=resource_pool,ray_cls_with_init=ray_cls_with_init)这行代码会启动两个WorkerDict的两个远程实例(这里不使用Ray中actor的称呼是因为和示例中的actor会混淆)。在调用spawn后返回了字典spawn_wg,打印spawn_wg["actor"].workersspawn_wg["critic"].workers可以发现这两个workergroup中持有的workers都是相同的。那么,spawn中作用是什么?来看下源码:

def spawn(self, prefix_set):def _rebind_actor_methods(worker_group, actor_name):prefix: str = actor_name + '_'for method_name in dir(worker_group):if method_name.startswith(prefix):# only valid when Python >= 3.9original_method_name = method_name.removeprefix(prefix)method = getattr(worker_group, method_name)setattr(worker_group, original_method_name, method)new_worker_group_dict = {}for prefix in prefix_set:# 从现有的workers中填写出名字为self._worker_names的worker并构成新的RayWorkerGroupnew_worker_group = self.from_detached(worker_names=self._worker_names,ray_cls_with_init=self.ray_cls_with_init)# 将带有前缀的方法名,移除前缀。例如:`actor_add`->`add`_rebind_actor_methods(new_worker_group, prefix)new_worker_group_dict[prefix] = new_worker_groupreturn new_worker_group_dict

self.from_detached作用是利用现有的worker构造一个新的RayWorkerGroup,其并不会启动新的Worker。_rebind_actor_methods则是将actor_add这种带前缀的方法名改为add,然后将add绑定到新的RayWorkerGroup上。

​ 所以,spawn的作用就是保证可以像非colocate那么的方式来执行具体的功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/37521.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL_数据表结构设计并创建

目录 前置&#xff1a; 1 数据表设计思路 2 数据表格SQL 3 创建 3.1 创建数据库 db_stock 3.2 在 pgAdmin4 中创建表 前置&#xff1a; 本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文 1 数据表设计思路 1 日数据来自优矿&#xff0c;优矿的数据…

植物来源药用天然产物的合成生物学研究进展-文献精读121

植物来源药用天然产物的合成生物学研究进展 摘要 大多数药用天然产物在植物中含量低微&#xff0c;提取分离困难&#xff1b;而且这些化合物一般结构复杂&#xff0c;化学合成难度大&#xff0c;还容易造成环境污染。基于合成生物学技术获得药用天然产物具有绿色环保和可持续发…

Nordic nRF 蓝牙的 Direct Test Mode (DTM) 测试介绍

目录 概述 1. 核心物理层参数 1.1 射频频率 (RF Channel Frequency) 1.2 发射功率 (TX Power) 1.3 调制方式 (Modulation) 1.4 数据包类型 (Packet Type) 1.5 测试模式 (Test Mode) 2. 参数配置方法 2.1 通过 HCI 命令配置 2.2 示例&#xff08;nRF52 系列&#xff0…

区间震荡指标

区间震荡指标的逻辑如下&#xff1a; 一、函数注解 1. Summation函数 功能&#xff1a; 计算给定价格序列Price的前Length个数据点的和&#xff0c;或在数据点数量超过Length时&#xff0c;计算滚动窗口内的价格和。 参数&#xff1a; Price(1)&#xff1a;价格序列&#…

文章防洗稿隐蔽混淆软件

如果你的文章经常被人洗稿搬运&#xff0c;那么这个小工具或许可以帮到你 基本原理: 在文章的每个字后面&#xff0c;加上一些随机的隐藏字符 人眼看不到&#xff0c;但是机器会读取到&#xff0c;如果别人是用AI工具来对你的文章进行洗稿&#xff0c;就会发现这是一堆乱码 你…

车载软件架构 --- AUTOSAR AP/CP中诊断的区别

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 周末洗了一个澡&#xff0c;换了一身衣服&#xff0c;出了门却不知道去哪儿&#xff0c;不知道去找谁&am…

百度OCR调用记录

根据说明&#xff0c;调用测试 设置注册的API Key和Secret Key 调用类&#xff08;官方文档中有&#xff09; 这里改传入路径&#xff1b; 测试问题 1.{"error_code":110,"error_msg":"Access token invalid or no longer valid"} 查到说是 …

19.哈希表的实现

1.哈希的概念 哈希(hash)⼜称散列&#xff0c;是⼀种组织数据的⽅式。从译名来看&#xff0c;有散乱排列的意思。本质就是通过哈希函数把关键字Key跟存储位置建⽴⼀个映射关系&#xff0c;查找时通过这个哈希函数计算出Key存储的位置&#xff0c;进⾏快速查找。 1.2.直接定址法…

网络编程之解除udp判断客户端是否断开

思路&#xff1a;每几秒发送一条不显示的信息&#xff0c;客户端断开则不再发送信息&#xff0c;超时则表示客户端断开连接。&#xff08;心跳包&#xff09; 服务器 #include <head.h>#define MAX_CLIENTS 100 // 最大支持100个客户端 #define TIMEOUT 5 // 5秒…

Java 大视界 -- Java 大数据在智能医疗远程会诊与专家协作中的技术支持(146)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

调用feapder作为子程序时setting.py文件不起作用

feaper 官方文档地址&#xff1a; 简介及安装 - feapder官方文档|feapder-document 问题&#xff1a; 在最近的开发中需要调用feapder作为主程序调用的子程序时发现自动入库时无法入库&#xff0c;通过查看日志信息发现连接数据库时被拒绝连接了&#xff0c;但是我的setting.p…

【STM32】SPI通信协议W25Q64Flash存储器芯片(学习笔记)

通信接口部分有介绍SPI&#xff1a;【STM32】USART串口协议&串口外设-学习笔记-CSDN博客 SPI通信协议 SPI通信 SPI&#xff08;Serial Peripheral Interface&#xff09;是由Motorola公司开发的一种通用数据总线四根通信线&#xff1a;SCK&#xff08;Serial Clock&…

刘强东突然发声:不该用算法压榨最底层兄弟!东哥,真正的人民企业家

今天忙了一天&#xff0c;很累&#xff0c;准备睡觉的时候&#xff0c;看到网上盛传的刘强东的朋友圈&#xff0c;东哥又在朋友圈发文了。 说实话&#xff0c;看完之后&#xff0c;感动&#xff0c;真的感动。 尤其是当我看到这两句话的时候。 1、我们所学的知识、商业模式、技…

Maven安装与环境配置

首先我们先介绍一些关于Maven的知识&#xff0c;如果着急直接看下面的安装教程。 目录 Maven介绍 Maven模型 Maven仓库 Maven安装 下载 安装步骤 Maven介绍 Apache Maven是一个项目管理和构建工具&#xff0c;它基于项目对象模型(Project Object Model , 简称: POM)的概念…

C++ 语法之数组指针

一维数组&#xff1a; 如果我们定义了一个一维数组&#xff0c;那么这个数组名&#xff0c;就是指向第一个数组元素的地址&#xff0c;也即&#xff0c;是整个数组分配的内存空间的首地址。 比如 int a[3]; 定义了一个包含三个元素的数组。因为一个int占4个字节&#xff0c;那…

021-TCMalloc

TCMalloc 以下是对TCMalloc的技术调研报告&#xff0c;结合原理、代码实现、优化参数及性能对比的综合分析&#xff1a; 一、TCMalloc核心原理 架构分层 TCMalloc采用三级缓存结构&#xff0c;具体流程参考下图&#xff1a; ┌─────────────┐ ┌───…

华为网路设备学习-16 虚拟路由器冗余协议(VRRP)

VRRP是针对干线上三层网络设备&#xff08;如&#xff1a;路由器、防火墙等&#xff09;的网络虚拟化技术&#xff0c;提供冗余和状态监测等功能。确保在网络中的单点故障发生时&#xff0c;能够快速切换到备份设备&#xff0c;从而保证网络通信的连续性和可靠性。‌ VRRP通过…

【华为Pura先锋盛典】华为Pura X“阔折叠”手机发布:首次全面搭载HarmonyOS 5

文章目录 前言一、阔感体验&#xff0c;大有不同二、鸿蒙AI&#xff0c;大有智慧三、便携出行&#xff0c;大有不同四、首款全面搭载 HarmonyOS 5 的手机五、卓越性能&#xff0c;可靠安心六、红枫影像&#xff0c;大放光彩预热&#xff1a;鸿蒙电脑HarmonyOS 5 升级计划小结 前…

算法题(103):数独

审题&#xff1a; 本题需要我们找出数独的解&#xff0c;并打印出来 时间复杂度分析&#xff1a; 本题是9*9的数独格子&#xff0c;所以数据量小于25&#xff0c;可以使用2^n的算法 思路&#xff1a; 方法一&#xff1a;深度优先搜索 首先确定搜索及插入策略&#xff1a; 我们采…

sougou AI close

sougou AI close 全局禁用《AI 汪仔》 现在丝滑流畅很多了