Python 全栈系列239 使用消息队列完成分布式任务

说明

在Python - 深度学习系列32 - glm2接口部署实践提到,通过部署本地化大模型来完成特定的任务。

由于大模型的部署依赖显卡,且常规量级的任务需要大量的worker支持,从成本考虑,租用算力机是比较经济的。由于任务是属于超高计算传输比的类型,且算力机随时可能出现不稳定的情况。

所以,使用消息队列完成此项任务是比较合适的。本次目标:

  • 1 回顾并快速搭建RabbitMQ和RabbitAgent服务的方法
  • 2 在无端口算力租用商(AutoDL)下部署chatglm2服务,并启动Worker处理数据
  • 3 在有端口算力租用商(仙宫云)下部署chatglm2服务,并用nginx反向代理,然后在异地启动worker测试

内容

1 构建消息队列(Server)

1.1 RabbitMQ镜像

先采用之前的命令启动
在这里插入图片描述
在算力机使用阿里云镜像仓库拉取,分钟级完成启动
在这里插入图片描述

1.2 RabbitAgent服务

理论上,应该封装为镜像后,以容器方式启动。不过租用的算力机系统盘太小(50G),装完CUDA之后只剩下10G多的空间,所以这次就把项目文件搬过去,在宿主机启动。
以后这类轻量级的服务,可以用一个很小的python环境镜像封装。

在这里插入图片描述

在这里插入图片描述

res = req.post('http://IP:24098/send_workq_message/', json = para_dict)
<Response [200]>
# 6 永久启动服务nohup python3 server.py  >/dev/null 2>&1 &

在这里插入图片描述
消费者(手动确认消息模式)

            import pikaimport jsoncredentials = pika.PlainCredentials('user', 'passwd')connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials))channel = connection.channel()def callback(ch, method, properties, body):input_data = json.loads(body.decode())print(f" [x] Received ",input_data)# time.sleep(body.count(b'.'))print(" [x] Done")ch.basic_ack(delivery_tag = method.delivery_tag)# channel.queue_declare(queue='hello1',durable=True)# 消费者预取消息数channel.basic_qos(prefetch_count=3)# 1 消费持久化的队列#channel.basic_consume(queue='hello1',#                        on_message_callback=callback, auto_ack =False)    # 2 消费非持久化队列channel.basic_consume(queue='hello2',on_message_callback=callback, auto_ack =False)    print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()[*] Waiting for messages. To exit press CTRL+C[x] Received  {'msg_id': 1, 'msg': 'first msg'}[x] Done[x] Received  {'msg_id': 2, 'msg': 'second msg'}[x] Done[x] Received  {'msg_id': 1, 'msg': 'first msg'}[x] Done[x] Received  {'msg_id': 2, 'msg': 'second msg'}[x] Done

1.3 将任务数据通过RabbitAgent写入

写入2.8万条,耗时5秒。
在这里插入图片描述

2 无端口算力租用商Worker测试

除了一定要返回的结果数据,还应该加上机器名称,显卡配置与处理时长。

2.1 启动服务

无端口算力机的代表就是AutoDL了,他们家机器也偏贵,4090一小时2.5~2.6元,比仙宫云高不少(我还是比较prefer后者的)。目前暂时没发现AutoDL有什么特别的优点,中规中矩。

在这里插入图片描述

发送文件

rsync -rvltz  -e 'ssh -p 44620'  --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace  root@connect.westb.seetacloud.com:/root/autodl-tmp/

在这里插入图片描述
然后修改api.py中模型加载的位置和端口号,启动3个服务。

以下是获取单条数据并进行调试的方法

import pika
import json
credentials = pika.PlainCredentials('x', 'xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('xxxx', 24091, '/', credentials,heartbeat=600))
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='ent_intro_task', durable=True)
# 从队列中获取一条消息
method_frame, header_frame, body = channel.basic_get(queue='ent_intro_task')
data = body.decode('utf-8')
data1 = json.loads(data)
...
connection.close()

完成测试之后,打包为ent_intro_worker.py,该脚本接受一个端口输入,以便将worker和server匹配起来,充分利用资源。

import pika
import json
import timeimport sys
import requests as req # 获取命令行参数
if len(sys.argv) > 1:parameter_value = sys.argv[1]print("传入的参数值为:", parameter_value)
else:print("未传入参数")
def send_resp(a_message):message_list = [a_message]para_dict = {}para_dict['rabbit'] = 'rabbit01'para_dict['routing_key'] = 'ent_intro_result'para_dict['durable'] = Truepara_dict['message_list'] = message_listpara_dict['queue'] = 'ent_intro_result'resp = req.post('http://IP:PORT/send_workq_message/', json = para_dict)return True tmp ='''
成立日期:%s 
注册地址:%s
%s简介,字数在100-200字之间
'''
credentials = pika.PlainCredentials('andy', 'andy123')
connection = pika.BlockingConnection(pika.ConnectionParameters('IP', PORT, '/', credentials, heartbeat=600))channel = connection.channel()# 手动确认
def callback(ch, method, properties, body):input_data = json.loads(body.decode())print(f" [x] Received ",input_data)tick1 = time.time()prompt_content = {'prompt': tmp % (input_data['reg_dt'], input_data['addr'], input_data['ent_table_name'] )}res = req.post('http://127.0.0.1:%s/' % parameter_value, json =prompt_content).json()tick2 = time.time()a_message = {}a_message['company'] =  input_data['ent_table_name']a_message['intro'] =  res['response']a_message['spends'] = tick2-tick1send_resp(a_message)print(" [x] Done")ch.basic_ack(delivery_tag = method.delivery_tag)channel.queue_declare(queue='ent_intro_task',durable=True)
channel.basic_qos(prefetch_count=3)
channel.basic_consume(queue='ent_intro_task',on_message_callback=callback, auto_ack =False)    print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

该worker获取数据,然后向本地大模型服务请求结果,然后将结果写到结果队列。启动worker进行测试,python3 ent_intro_worker.py 24096
没问题后就转入后台运行:nohup python3 ent_intro_worker.py 24096 >/dev/null 2>&1 &

在这里插入图片描述

3 有端口算力租用商Worker测试

3.1 负载均衡

由于单个的量化模型不足以充分利用显卡的性能,所以就要启动多个同样的服务。调用时需要进行多个服务的端口指定,这样就比较麻烦。
在这里插入图片描述
用nginx进行负载均衡,然后只暴露一个端口作为服务接口。然后接下来在远程主机调用这个服务接口(worker)。

租用一台仙宫云主机。数据上传有点问题,感觉它的云盘是外挂的,而且不稳定。最终我把数据先传到系统盘,再从系统盘传到云盘才成功。另外在启动服务时,模型的加载时间明显太长了。感觉云盘是机械盘。

rsync -rvltz  -e 'ssh -p 111'  --progress /data/implement_container_file/chatglm2_6b_int4_api/workspace  root@m1ehp5n70rxvg81b.ssh.x-gpu.com:/root/
==> /root/cloud/

安装包

pip3 install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/

启动三个服务。

安装、配置并启动nginx。

events {#设置工作模式为epoll,除此之外还有select,poll,kqueue,rtsig和/dev/poll模式use epoll;#定义每个进程的最大连接数,受系统进程的最大打开文件数量限制worker_connections  1024;
}http{# 配置nginx上传文件最大限制client_max_body_size 50000m;upstream multi_ma {# fair;server 172.17.0.1:10000 ;server 172.17.0.1:10001 ;server 172.17.0.1:10002 ;}server {listen 80;location / {proxy_pass http://multi_ma;}}}

远端使用worker调用。

实操时发现,虽然仙宫云可以给一个80端口,但是似乎也是容器里的虚拟环境,不让再安装包了,所以也没法安装nginx。不过理论上应该可以实现。

最后,还是用类似AutoDL的方式启动3个worker。

两块4090之后,速度明显快多了。
在这里插入图片描述

3.2 获取结果并入库

建立对应的表

# 2 导入包
from Basefuncs import * # 快速载入连接
def make_local_wmongo_connect(server_name):try:tem_w = from_pickle(server_name)print('【Loading cur_w】from pickle')except:w = WMongo('w')tem_w = w.TryConnectionOnceAndForever(server_name =server_name)to_pickle(tem_w, server_name)return tem_wm8_cur_w = make_local_wmongo_connect('m8.24003')# 建立索引
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='pid')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='company')
m8_cur_w.set_a_index(tier1='llm', tier2 = 'company_intro', idx_var='model_name')

从结果队列里取数,然后入库

# 封装函数
def get_some_batch_updated():credentials = pika.PlainCredentials('xxx', 'xxx')connection = pika.BlockingConnection(pika.ConnectionParameters('IP', port, '/', credentials, heartbeat=600))# 2 迭代的获取数据res_list = []with connection.channel() as channel:for i in range(100):# 声明一个队列channel.queue_declare(queue='ent_intro_result', durable=True)# 从队列中获取一条消息method_frame, header_frame, body = channel.basic_get(queue='ent_intro_result')res_dict = json.loads(body.decode())res_list.append(res_dict)channel.basic_ack(delivery_tag=method_frame.delivery_tag)# 3 拼凑为标准数据框res_df = pd.DataFrame(res_list)# 增加必要的模型字段res_df['model_name'] = 'chatglm2_6b_int4'res_df['pid'] = (res_df['company'] + res_df['model_name']).apply(md5_trans)m8_cur_w.insert_or_update_with_key(tier1 = 'llm', tier2 = 'company_intro', data_listofdict= res_df.to_dict(orient='records') , key_name='pid')connection.close()# 获取并存储100条
get_some_batch_updated()

4 结语

本次完成了:

  • 1 RabbitMQ 和 RabbitAgent的建立。这使得其他机器可以不必要使用端口,非常适合超高计算传输比的任务。
  • 2 将原始数据通过rabbit agent 发布到任务队列
  • 3 将chatglm2-6b部署到算力租用机:测试了主流的三家autodl, anygpu和仙宫云,都是ok的
  • 4 在各算力机上启动worker进行处理
  • 5 将结果获取,然后存在本地的mongo

没能成功完成的实践是在仙宫云使用nginx做负载均衡,简化worker的请求。

结论:用llm来做任务成本还是比较高的。价格折算下来,大约 ¥1/千条。所以,要把大模型用在高价值领域,例如替代人工打标,写函数这些。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/306786.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AR地图导览小程序是怎么开发出来的?

在移动互联网时代&#xff0c;AR技术的发展为地图导览提供了全新的可能性。AR地图导览小程序结合了虚拟现实技术和地图导航功能&#xff0c;为用户提供了更加沉浸式、直观的导览体验。本文将从专业性和思考深度两个方面&#xff0c;探讨AR地图导览小程序的开发方案。 编辑搜图 …

【大语言模型】基础:如何处理文章,向量化与BoW

词袋模型&#xff08;BoW&#xff09;是自然语言处理&#xff08;NLP&#xff09;和机器学习中一种简单而广泛使用的文本表示方法。它将文本文档转换为数值特征向量&#xff0c;使得可以对文本数据执行数学和统计操作。词袋模型将文本视为无序的单词集合&#xff08;或“袋”&a…

给现有rabbitmq集群添加rabbitmq节点

现有的&#xff1a;10.2.59.216 rabbit-node1 10.2.59.217 rabbit-node2 新增 10.2.59.199 rabbit-node3 1、分别到官网下载erlang、rabbitmq安装包&#xff0c;我得版本跟现有集群保持一致。 erlang安装包&#xff1a;otp_src_22.0.tar.gz rabbitmq安装包&#xff1…

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第三套

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第三套 (共9套&#xff0c;有答案和解析&#xff0c;答案非官方&#xff0c;未仔细校正&#xff0c;仅供参考&#xff09; 部分题目分享&#xff0c;完整版获取&#xff08;WX:didadidadidida313&#xff0c;加我备注&#x…

c++编程(3)——类和对象(1)、类

欢迎来到博主的专栏——c编程 博主ID&#xff1a;代码小豪 文章目录 类对象类的访问权限类的作用域 类 c最初对c语言的扩展就是增加了类的概念&#xff0c;使得c语言在原有的基础之上可以做到信息隐藏和封装。 那么我们先来讲讲“带类的c”与C语言相比有什么改进。 先讲讲类…

Golang | Leetcode Golang题解之第24题两两交换链表中的节点

题目&#xff1a; 题解&#xff1a; func swapPairs(head *ListNode) *ListNode {dummyHead : &ListNode{0, head}temp : dummyHeadfor temp.Next ! nil && temp.Next.Next ! nil {node1 : temp.Nextnode2 : temp.Next.Nexttemp.Next node2node1.Next node2.Nex…

论文阅读:Polyp-PVT: Polyp Segmentation with PyramidVision Transformers

这篇论文提出了一种名为Polyp-PVT的新型息肉分割框架&#xff0c;该框架采用金字塔视觉变换器&#xff08;Pyramid Vision Transformer, PVT&#xff09;作为编码器&#xff0c;以显式提取更强大的特征。本模型中使用到的关键技术有三个&#xff1a;渐进式特征融合、通道和空间…

【vue】watch 侦听器

watch&#xff1a;可监听值的变化&#xff0c;旧值和新值 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><titl…

【opencv】示例-imgcodecs_jpeg.cpp使用OpenCV库来创建和处理图像,并保存为不同JPEG采样因子的版本...

上层-原始图像 下层&#xff1a;编码解码后的lossy_img #include <opencv2/core.hpp> // 包含OpenCV核心功能的头文件 #include <opencv2/imgproc.hpp> // 包含OpenCV图像处理功能的头文件 #include <opencv2/imgcodecs.hpp> // 包含OpenCV图像编码解码功能…

平板设备IP地址设置指南

在数字化时代&#xff0c;平板电脑作为便携且功能强大的设备&#xff0c;广泛应用于日常生活和工作中。为了确保平板能够正常接入网络并与其他设备进行通信&#xff0c;正确设置IP地址是至关重要的。虎观小二将为您介绍如何设置平板的IP地址&#xff0c;帮助您轻松完成网络配置…

大创项目推荐 深度学习+opencv+python实现车道线检测 - 自动驾驶

文章目录 0 前言1 课题背景2 实现效果3 卷积神经网络3.1卷积层3.2 池化层3.3 激活函数&#xff1a;3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 YOLOV56 数据集处理7 模型训练8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &am…

数字IC/FPGA——锁存器/触发器/寄存器

本文主要介绍以下几点&#xff1a; 什么是触发器和锁存器门电路和触发器的区别什么是电平钟控触发器电平钟控触发器触发器和锁存器的区别触发器的分类方式&#xff1a;逻辑功能、触发方式、电路结构、存储数据原理、构成触发器的基本器件寄存器利用移位寄存器实现串并转换或并…

WordPress LayerSlider插件SQL注入漏洞复现(CVE-2024-2879)

0x01 免责声明 请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;作者不为此承担任何责任。工具来自网络&#xff0c;安全性自测&#xff0c;如有侵权请联系删…

LiveNVR监控流媒体Onvif/RTSP功能-概览负载统计展示取流中、播放中、录像中点击柱状图快速定位相关会话

LiveNVR概览负载统计展示取流中、播放中、录像中点击柱状图快速定位相关会话 1、负载信息说明2、快速定位会话3、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、负载信息说明 实时展示取流中、播放中、录像中等使用数目 取流中&#xff1a;当前拉流到平台的实时通道数目播放中&am…

基于单片机的智能锁芯报警系统设计

摘 要:在传统的智能锁芯报警系统中,存在响应时间较长的问题,为此,提出一种基于单片机的智能锁芯报警系统。通过控制模块、智能锁芯设置模块、报警模块、中断模块、液晶模块等,建立系统总体框架,根据系统总体框架,通过单片机、电源适配器、智能锁芯、报警器、LED灯等…

浏览器工作原理与实践--HTTP/2:如何提升网络速度

上一篇文章我们聊了HTTP/1.1的发展史&#xff0c;虽然HTTP/1.1已经做了大量的优化&#xff0c;但是依然存在很多性能瓶颈&#xff0c;依然不能满足我们日益变化的新需求&#xff0c;所以就有了我们今天要聊的HTTP/2。 本文我们依然从需求的层面来谈&#xff0c;先分析HTTP/1.1存…

CMake构建OpenCv并导入QT项目过程中出现的问题汇总

前言 再此之前请确保你的环境变量是否配置&#xff0c;这是总共需要配置的环境变量 E:\cmake\bin E:\OpenCv\opencv\build\x64\vc15\bin F:\Qt\Tools\mingw730_64\bin F:\Qt\5.12.4\mingw73_64\bin 问题一&#xff1a; CMake Error: CMake was unable to find a build program…

Java 中文官方教程 2022 版(四十五)

原文&#xff1a;docs.oracle.com/javase/tutorial/reallybigindex.html 教程&#xff1a;自定义网络 原文&#xff1a;docs.oracle.com/javase/tutorial/networking/index.html Java 平台备受推崇&#xff0c;部分原因是其适用于编写使用和与互联网资源以及万维网进行交互的程…

Git Clone succeeded, but checkout failed

Clone succeeded, but checkout failed&#xff1a; Filename too long 原因&#xff1a; 由于系统限制&#xff0c;路径太长&#xff0c;无法检出 解决方案&#xff1a; # git允许长路径&#xff0c;在已clone的仓库执行 git config core.longpaths true # 再次检出 git ch…

云原生数据库海山(He3DB)PostgreSQL版核心设计理念

本期深入解析云原生数据库海山PostgreSQL版&#xff08;以下简称“He3DB”&#xff09;的设计理念&#xff0c;探讨在设计云原生数据库过程中遇到的工程挑战&#xff0c;并展示He3DB如何有效地解决这些问题。 He3DB是移动云受到 Amazon Aurora 论文启发而独立自主设计的云原生数…