Python 全栈系列243 S2S flask_celery

说明

按现有的几个架构部件,构建数据流。

S = Redis Stream。这个可以作为缓冲队列和简单任务队列,速度非常快,至少是万条/秒的速度。
Q = RabbitMQ。这个作为任务队列,消息也主要是元数据。读速比较慢,但有一些特性,然后自带前端,作为任务队列比较合适。
M = Mongo。这个作为数据主库还是比较合适的。具有丰富的数据操作模式,同时性能也不错。
C = ClickHouse。这个特别适合作为任务数据库。因为列式存储的特性,其吞吐性能,简单统计功能甚至逼近了程序处理的速度。例如,存储10万条数据,大约也就3秒;统计900万数据某个字段的长度,时间也不到5秒。(过去在处理上,基本上按照100万条/秒来评估默认的程序处理能力)

RabbitMQ和Redis Streams都是流行的队列系统,用于处理消息传递任务,但它们在效率和应用场景上有所不同。
RabbitMQ是基于AMQP(高级消息队列协议)的开源消息代理,它提供了可靠的消息传递机制,能够保证消息的持久性,即使在发送或接收过程中出现故障也不会丢失消息。RabbitMQ适用于需要高可靠性和复杂路由策略的生产环境,特别是在分布式系统中,它能够很好地处理复杂的异步消息传递任务。
另一方面,Redis Streams是Redis 5.0版本引入的新特性,它提供了一个持久化的消息队列系统。Redis Streams的设计理念在于提供高性能的发布/订阅模型,尤其适合于即时消息处理场景。与RabbitMQ相比,Redis Streams在性能上具有优势,因为它利用了Redis本身的高性能特性,使得消息的读写速度非常快。
在效率方面,Redis Streams通常被认为比RabbitMQ更快,特别是在处理大量实时数据流时。这是因为Redis作为一个内存中的键值存储系统,本身就具有很高的读写速度,而Streams作为其一部分,也继承了这种高效性。此外,Redis Streams的无锁设计进一步提升了其性能,使得它在处理并发请求时更加高效。
然而,RabbitMQ在某些情况下可能更适合使用,特别是当需要处理复杂的异步任务、保证消息的顺序性以及实现可靠的消息传递时。RabbitMQ的这些特性使得它在金融、医疗等关键行业中被广泛应用。
总之,在选择RabbitMQ还是Redis Streams时,应考虑到具体的应用场景、性能需求和可靠性要求。如果追求极致的性能和实时性,Redis Streams可能是更好的选择;而如果需要更高的可靠性和复杂的路由功能,RabbitMQ可能更为合适。

本次目标是搭建一个worker,可以通过参数化方式,完成两个S间的流转。除了M和C之前一般不会直接流转,那么应该有 4*3 - 2 = 10 种组件间的流转。

内容

整体的实现逻辑顺序为:

  • 1 使用QManager完成S2S的动作(函数)
  • 2 将函数定义为celery task
  • 3 将flask-celery发布为systemd服务

1 S2S 函数

S2S应该是一种最常见的任务

首先是QManager, 这个是对RedisAgent进行封装和集成的对象,本质上是个二传手。

QManager 集成了:

  • 1 判断队列是否可以写入
  • 2 并行写入
  • 3 fetch和range两种方式取数
  • 4 删除消息
import requests as req 
class QManager:def __init__(self , batch_size = 1000, redis_agent_host = 'http://172.17.0.1:24021/',redis_connection_hash =None,q_max_len = 100000):self.batch_size = batch_sizeself.redis_agent_host = redis_agent_hostself.redis_connection_hash = redis_connection_hashself.q_max_len = q_max_lendef auto_connect(self, db_server_name):print('这里应该根据某个参数值,自动切换为合适的连接')def info(self):return req.post(self.redis_agent_host + 'info/',json = {'connection_hash':self.redis_connection_hash}).json()# redis没有提供命令来列出streams# def qname_list(self, stream_name = '*'):#     return req.post(self.redis_agent_host + 'info_stream/',json = {'stream_name':stream_name}).json()# 查看队列长度def stream_len(self, stream_name):cur_len_resp = req.post(self.redis_agent_host + 'len_of_queue/',json ={'stream_name':stream_name,'connection_hash':self.redis_connection_hash}).json()return cur_len_resp['data']# 创建队列和分组def ensure_group(self, stream_name, group_name ='group1', start_point='0'):return req.post(self.redis_agent_host +'ensure_group/',json ={'stream_name':stream_name,'group_name':group_name,'start_point':start_point}).json()# 判断队列是否可以插入def _is_q_available(self,stream_name):cur_len = self.stream_len(stream_name)if cur_len + self.batch_size >=self.q_max_len:return False else:return True #  基于并发方法,向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右def parrallel_write_msg(self,stream_name, data_listofdict = None, time_out = 30,is_return_msg_id_list=False):resp_dict = req.post(self.redis_agent_host + 'batch_add_msg/',json ={'connection_hash':self.redis_connection_hash,'stream_name':stream_name,'msg_dict_list':data_listofdict,'maxlen':self.q_max_len,'is_return_msg_id_list':is_return_msg_id_list},timeout=time_out).json()return resp_dict# 读取# 批量获取数据 getdef xrange(self, stream_name, count = None):cur_count = count or self.batch_size recs_resp = req.post(self.redis_agent_host + 'xrange/',json ={'connection_hash':self.redis_connection_hash, 'stream_name':stream_name,'count':cur_count}).json()return recs_resp# 批量获取数据 fetchdef xfetch(self, stream_name, count = None,group_name = 'group1' , consumer_name = 'consumer1'):cur_count = count or self.batch_sizereturn req.post(self.redis_agent_host + 'fetch_msg/',json = {'connection_hash':self.redis_connection_hash,'stream_name':stream_name,'group_name':group_name,'consumer_name':consumer_name,'count':cur_count}).json()# 批量删除消息def xdel(self,stream_name,mid_or_list =None):if len(mid_or_list):return req.post(self.redis_agent_host  + 'del_msg/',json ={'connection_hash':self.redis_connection_hash, 'stream_name':stream_name,'mid_or_list':mid_or_list}).json()@staticmethoddef extract_msg_id(some_msg_list):return [x['_msg_id'] for x in some_msg_list]

基于此,稍微修改就可以完成S2S的任务

按照边的方式,给到left和right的参数信息。使用这些信息分别初始化left和right的QManager。最后按照配置里的约定,执行n次同步。每次执行时,都会看下目标队列是否已满,若已满则放弃写入,否则执行写入,然后删除消息。

# local
cfg = {'target_q_max_len': 10,'source_read_batch_num':1,'target_write_batch_num':1,'source_redis_agent_host':'http://172.17.0.1:24021/','source_connection_hash':None,'target_redis_agent_host':'http://172.17.0.1:24021/','target_connection_hash':None,'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),'target_stream':'.'.join(['STREAM','test','test', 'stream_out'])}# read
source_qm = QManager(batch_size =cfg['source_read_batch_num'],redis_agent_host = cfg['source_redis_agent_host'],redis_connection_hash = cfg['source_connection_hash'])
# write
target_qm = QManager(batch_size =cfg['target_write_batch_num'],redis_agent_host = cfg['target_redis_agent_host'],redis_connection_hash = cfg['target_connection_hash'])# 确保队列的存在
if True:source_qm.ensure_group(cfg['source_stream'])target_qm.ensure_group(cfg['target_stream'])'''
主逻辑:- 1 判断目标队列是否满,如果是,那么直接退出
- 2 从源队列取数(采用xrange方法),如果没有数据,直接退出【每对stream之间,只会有一个 sniffer 】
- 3 将源队列数据写入目标队列
- 4 从源队列中删除这些数据'''print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))for _ in range(cfg['max_exec_cnt']):if target_qm._is_q_available(cfg['target_stream']):print('target q ok')msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']if len(msg_list) == 0:print('source q empty')breakelse:# 写入目标队列target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)# 将写入的消息从源队列删除to_del_msg_id_list = source_qm.extract_msg_id(msg_list)source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)else:break

2 Celery Task

然后将上述功能函数写入Flask-Celery

第一部分是在 celery的修饰器下,将任务函数搬进去。然后在app下定义了任务的调用,主要是用到了delay方法,实现异步调用。

# =======================以下是正式的内容
@celery_.task
def s2s_handler(cfg_dict = None):cfg = cfg_dict# readsource_qm = QManager(batch_size =cfg['source_read_batch_num'],redis_agent_host = cfg['source_redis_agent_host'],redis_connection_hash = cfg['source_connection_hash'])# writetarget_qm = QManager(batch_size =cfg['target_write_batch_num'],redis_agent_host = cfg['target_redis_agent_host'],redis_connection_hash = cfg['target_connection_hash'])print('source q len ', source_qm.stream_len(cfg['source_stream']))print('target q len ', target_qm.stream_len(cfg['target_stream']))for _ in range(cfg['max_exec_cnt']):if target_qm._is_q_available(cfg['target_stream']):print('target q ok')msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']if len(msg_list) == 0:print('source q empty')breakelse:# 写入目标队列target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)# 将写入的消息从源队列删除to_del_msg_id_list = source_qm.extract_msg_id(msg_list)source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)else:break# 执行任务的路由 POST
@app.route("/s2s/", methods=['GET','POST'] )
def s2s():input_data = request.get_json()# 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果result = s2s_handler.delay(input_data)return result.id

调用测试,存入一万条消息(之前还有70条残留),任务执行后,source_q中的数据将会逐渐流转到target_q

# debug - 样例数据写入源队列
data_listofdict = [{'msg_id': i, 'data':'test'} for i in range(10000)]
source_qm.parrallel_write_msg(cfg['source_stream'], data_listofdict= data_listofdict)print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))source q len  10070
target q len  230import requests as req 
# 假设是发往本机: 注意,地址是127.0.0.1
cfg1 = {'target_q_max_len': 100000,'source_read_batch_num':1,'target_write_batch_num':1,'source_redis_agent_host':'http://127.0.0.1:24021/','source_connection_hash':None,'target_redis_agent_host':'http://127.0.0.1:24021/','target_connection_hash':None,'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),'target_stream':'.'.join(['STREAM','test','test', 'stream_out']),'max_exec_cnt':10}resp = req.post('http://127.0.0.1:24104/s2s/',json = cfg1 )# 返回任务号
In [9]: resp.text
Out[9]: '177e57b7-09c5-43f0-ae1f-0cbe8e41dbf5'# 流转了10条消息
In [10]: print('source q len ', source_qm.stream_len(cfg['source_stream']))...: print('target q len ', target_qm.stream_len(cfg['target_stream']))
source q len  10060
target q len  240

3 Systemd Service

由于服务是在宿主机启动的,而且是基础服务,所以使用systemd配置自启动。启动命令有点小坑,可参考 一次搞定 Linux systemd 服务脚本

本次要点就在于要用forking启动【采用sh脚本启动其他进程时Type须为forking】,因为要启动flask和celery两个服务才行。

[Unit]   
Description=test        # 简单描述服务
After=network.target    # 描述服务类别,表示本服务需要在network服务启动后在启动
Before=xxx.service      # 表示需要在某些服务启动之前启动,After和Before字段只涉及启动顺序,不涉及依赖关系[Service] 
Type=forking            # 设置服务的启动方式
User=USER               # 设置服务运行的用户
Group=USER              # 设置服务运行的用户组
WorkingDirectory=/PATH  # 设置服务运行的路径(cwd)
KillMode=control-group  # 定义systemd如何停止服务
Restart=no              # 定义服务进程退出后,systemd的重启方式,默认是不重启
ExecStart=/start.sh     # 服务启动命令,命令需要绝对路径(采用sh脚本启动其他进程时Type须为forking)[Install]   
WantedBy=multi-user.target  # 多用户

然后就好了
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/324432.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux(openEuler、CentOS8)常用的IP修改方式(文本配置工具nmtui+配置文件+nmcli命令)

----本实验环境为openEuler系统<以server方式安装>&#xff08;CentOS类似&#xff0c;可参考本文&#xff09;---- 一、知识点 &#xff08;一&#xff09;文本配置工具nmtui(openEuler已预装) nmtui&#xff08;NetworkManager Text User Interface&#xff09;是一…

【LeetCode算法】389. 找不同

提示&#xff1a;此文章仅作为本人记录日常学习使用&#xff0c;若有存在错误或者不严谨得地方欢迎指正。 文章目录 一、题目二、思路三、解决方案 一、题目 给定两个字符串 s 和 t &#xff0c;它们只包含小写字母。字符串 t 由字符串 s 随机重排&#xff0c;然后在随机位置添…

Linux第三节--常见的指令介绍集合(持续更新中)

点赞关注不迷路&#xff01;&#xff0c;本节涉及初识Linux第三节&#xff0c;主要为常见的几条指令介绍。 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1f44d;&#x1f3fb; 收藏 ✨ 加关注&#x1f440; 期待与你共同进步! Linux下基本指令 1. man指令 Linu…

【网络原理】HTTPS | 对称加密 | 非对称加密 | 中间人攻击 | 证书 | 公钥 | 私钥

文章目录 HTTPSHTTPS的工作过程1.对称加密&#xff0c;加密业务数据2.非对称加密&#xff0c;加密对称密钥中间人攻击 3.使用证书&#xff0c;校验服务器的公钥 HTTPS 在http的基础上引入了一个加密层 1.对称加密&#xff0c;加密和解谜使用同一个密钥 2.非对称加密。有两个…

linux性能监控之top

说完了atop和htop&#xff0c;我们在来说说Linux自带的top&#xff0c;我们先看看命令效果&#xff1a; 可以看到是一个实时的系统监控工具&#xff0c;提供了一个动态的、交互式的实时视图&#xff0c;显示系统的整体性能信息以及正在运行的进程的相关信息。 我们先来解析下命…

【解决问题】电脑耳机可以听到对方声音,对方听不到自己声音

背景&#xff1a; 项目线上开会&#xff0c;自己说话对方一直听不到&#xff0c;但是我可以听到对方的声音 问题排查&#xff1a; 针对这个问题&#xff0c;尝试重新启动电脑&#xff0c;各种查看声音设置&#xff0c;都没有解决 最后看声卡驱动&#xff0c;想更新声卡驱动…

学习网络需要认识的各种设备

网桥&#xff08;bridge&#xff09; 网桥工作在数据链路层&#xff0c;可以把多个局域网连接起来&#xff0c;组成一个更大的局域网 以太网中&#xff0c;数据链路层地址就是mac地址&#xff0c;网桥与集线器的区别就是&#xff0c;网桥会过滤mac&#xff0c;只有目的mac地址…

Ardupilot Rpanion iperf网络性能测试

Ardupilot Rpanion iperf网络性能测试 1. 源由2. 分析3. 安装4. 测试4.1 第一次测试4.1.1 iperf测试参数A4.1.1.1 测试链路14.1.1.2 测试链路24.1.1.3 测试链路3 4.1.2 iperf测试参数B - 测试链路34.1.2.1 测试数据4.1.2.2 数据简单分析4.1.2.3 数据深入分析4.1.2.4 模拟测试网…

【八】centos7.6安装chrome和chromedriver并启动selenium

学习来源&#xff1a; 安装chrome和chrome driver -----https://blog.csdn.net/zdlcome/article/details/133813441 安装Python11 -----https://blog.csdn.net/weixin_43741408/article/details/130251102 chromedriver下载地址 -----https://googlechromelabs.github.io/chrom…

# 从浅入深 学习 SpringCloud 微服务架构(八)Sentinel(1)

从浅入深 学习 SpringCloud 微服务架构&#xff08;八&#xff09;Sentinel&#xff08;1&#xff09; 一、sentinel&#xff1a;概述 1、前言 – 服务熔断 Hystrix 的替换方案。 1&#xff09;2018年底 Netflix 官方宣布 Hystrix 已经足够稳定&#xff0c;不再积极开发 Hys…

用标准的GNU/Linux命令替换Alpine上的精简版命令

Alpine Linux 是一个基于 musl libc 和 busybox 的轻量级Linux发行版&#xff0c;busybox 实现了很多常用类Unix命令的精简版&#xff0c;特点是体积很小&#xff0c;舍弃了很多不常用参数&#xff0c;我们简单对比一下标准Linux自带的 date 命令 和 Alpine下默认的 date 命令便…

初期Linux

一&#xff0c;系统分为 1.1window系统 个人 &#xff1a;win7&#xff0c;win8&#xff0c;Win10&#xff0c;Win11服务器版&#xff1a;window server 2003&#xff0c;window server 2008 1.2Linux系统 centos7redhatubantukali 1.3什么是Linux&#xff1f; Linux是基…

连通“数据”,让制造变“聪明”

说起数据智能&#xff0c;你第一时间想到的是什么呢&#xff1f;是科技感十足的智慧城市&#xff1f;还是炫酷的人工智能景象&#xff1f; 数据作为企业的战略资产越来越受到重视&#xff0c;从最初的数据协助业务协同&#xff0c;转化为数据驱动业务&#xff0c;数据驱动运营…

ORACLE ODAX9-2的一个误告警Affects: /SYS/MB的分析处理

在运维的多套ORACLE ODAX9-2版本&#xff0c;都遇到了一个计算节点的告警&#xff1a;Description: The service Processor poweron selftest has deteced a problem. Probabity;:100, UulD:cd1ebbdf-f099-61de-ca44-ef646defe034, Resource:/SYS/MB,&#xff1b;此告警从描述上…

静态分析-RIPS-源码解析记录-01

token流扫描重构部分&#xff0c;这一部分主要利用php的token解析api解析出来的token流&#xff0c;对其中的特定token进行删除、替换、对于特定的语法结构进行重构&#xff0c;保持php语法结构上的一致性 解析主要在lib/scanner.php中通过Tokenizer这个类来实现,也就是在main…

怎么把手机ip地址变成了外省

在日常使用中&#xff0c;有时我们可能因为某些原因需要快速切换手机的IP地址&#xff0c;特别是当需要从一个省份切换到另一个省份的IP时。这种需求可能来源于网络访问限制、地理位置相关服务的使用、或者网络安全等方面的考虑。那么&#xff0c;怎么把手机IP地址变成外省呢&a…

测评工作室的养号成本,效率,纯净度,便捷性等问题怎么解决?

大家好&#xff0c;我是南哥聊跨境&#xff0c;最近有很多做测评工作室的朋友找到南哥&#xff0c;问我有什么新的测评养号系统可以解决成本&#xff0c;效率&#xff0c;纯净度&#xff0c;便捷性等问题 测评养号系统从最早的模拟器、虚拟机到911、VPS、手机设备等&#xff0…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-12-蜂鸣器

前言&#xff1a; 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸机篇”视频的学习笔记&#xff0c;在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

【大模型】LLaMA-1 模型介绍

文章目录 一、背景介绍二、模型介绍2.1 模型结构2.2 模型超参数2.3 SwiGLU 三、代码分析3.1 模型结构代码3.2 FairScale库介绍 四、LLaMA家族模型4.1 Alpaca4.2 Vicuna4.3 Koala(考拉)4.4 Baize (白泽)4.5 Luotuo (骆驼&#xff0c;Chinese)4.6 其他 参考资料 LLaMA&#xff08…

Human β-NGF ELISA试剂盒

走近β-NGF 神经生长因子(nerve growth factor, NGF)最初从小鼠颌下腺中以7S复合体的形式分离而得&#xff0c;复合体由三个非共价连接的亚基α,β和γ组成。 NGF的α和β亚基均属于丝氨酸蛋白酶组织激肽释放酶家族成员&#xff0c;β亚基也称为β-NGF或2…