Celery + redis 异步分布式任务队列安装测试

Celery 异步分布式任务队列

Celery 5.4.0 官方文档

环境:3台 centos7.9 普通用户

redisSchedulerworker
dp951
dp96111
dp971

文章目录

  • Celery 异步分布式任务队列
    • 1、Celery 介绍
    • 2、安装部署
      • 2.1 安装消息中间件(broker)
      • 2.2 安装Celery
    • 3、功能测试
      • 3.1 创建任务
          • 3.1.1 在dp96 机器上创建应用文件 tasks.py
          • 3.1.2 分发应用到dp95、dp97 机器相同位置
      • 3.2 启动worker服务
          • 3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
          • 3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果
      • 3.3 执行分布式异步任务
      • 3.4 查看异步任务结果

1、Celery 介绍

Celery是一个简单、灵活、可靠的分布式系统,基于python开发,可以处理大量的消息,同时提供维护这样一个系统所需的工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度

使用场景

  • 定时任务:定时爬虫、算法模型定时输出
  • 异步任务:I/O密集型任务,消息推送、邮件发送、ai客服
  • 分布式调度:airflow + celery 大数据ETL调度

优点:使用后再说

架构:AMQP(Advanced Message Queuing Protocol)高级消息队列协议

2、安装部署

2.1 安装消息中间件(broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)。官方推荐RabbitMQ和Redis。

RabbitMQ:

  • 优点:支持AMQP(高级消息队列协议),可靠性高,支持消息持久化,有丰富的功能特性(如消息确认、重试、超时、死信队列等)。
  • 缺点:学习曲线较陡峭,配置复杂,性能可能较低。

Redis:

  • 优点:配置简单,性能高,可以用作消息队列用于简单的场景。
  • 缺点:不支持AMQP,不适合重载的消息队列处理,不支持消息的持久化和异步确认。

本次测试安装 redis http://download.redis.io/releases/

# redis 安装
# 解压
[dp96]$ tar -zxvf redis-7.2.4.tar.gz
[dp96]$ cd redis-7.2.4
# 编译
[dp96]$ make
# 安装
[dp96]$ make PREFIX= ~/redis install# 修改配置文件
[dp96]$ vim ./redis.conf
'''
bind * -::*           # 绑定主机地址
protected-mode no     # 保护模式设置为 no,允许外网连接
port 6379             # 监听端口
timeout 0             # 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
daemonize yes         # yes表示启用守护进程,默认是no即不以守护进程方式运行
loglevel notice       # 日志级别
logfile ./redis-server.log  # 指定 Redis 服务器的日志文件路径
dir ./                      # Redis 服务器的工作目录,即数据库文件的存放路径
pidfile /tmp/redis_6379.pid # 进程文件
'''# 启动redis
[dp96]$ cd ~/redis/bin
[dp96]$ ./redis-server ~/opt/redis/redis-7.2.4/redis.conf# 连接验证
[dp96]$ ./redis-cli
127.0.0.1:6379> CONFIG GET *

在这里插入图片描述

# redis 常用操作
# Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)# 切换数据库0-15
127.0.0.1:6379> select 1 
127.0.0.1:6379> scan 0 /keys *       # 查看0号数据库所有的key# 字符串类型
127.0.0.1:6379> set a 'test' 
127.0.0.1:6379> get a           # "test"
127.0.0.1:6379> del a# 哈希类型,适合存储对象
127.0.0.1:6379> hmset a field1 'Hello' field2 'world' 
127.0.0.1:6379> hget a            # (error) ERR wrong number of arguments for 'hget' command        
127.0.0.1:6379> hget a field2     # "world"
127.0.0.1:6379> del a             # 重复key 会报错# 列表
127.0.0.1:6379> lpush a redis
127.0.0.1:6379> lpush a rabbitmq
127.0.0.1:6379> lpush a 1
127.0.0.1:6379> lrange a 0 10
'''
1) "1"
2) "rabbitmq"
3) "redis"
'''# 集合
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 2
127.0.0.1:6379> smembers a
'''
1) "1"
2) "2"
'''# 有序集合 zadd key score member 
127.0.0.1:6379> zadd a 1 张三
127.0.0.1:6379> zadd a 1 李四
127.0.0.1:6379> zadd a 2 王五
127.0.0.1:6379> zrangebyscore a 0 1   # 选去0-1分的集合元素'''
1) "\xe5\xbc\xa0\xe4\xb8\x89"
2) "\xe5\xe6\x9d\x8e\xe5\x9b\x9b"
'''

2.2 安装Celery

# 安装celery
(airflow)[dp96]$ pip install celery -i https://pypi.tuna.tsinghua.edu.cn/simple(airflow)[dp95]$ pip install celery
(airflow)[dp97]$ pip install celery# 安装 redis Celery 远程连接redis服务时使用
(airflow)[dp96]$ pip install redis
(airflow)[dp95]$ pip install redis
(airflow)[dp97]$ pip install redis

3、功能测试

3.1 创建任务

3.1.1 在dp96 机器上创建应用文件 tasks.py
# 创建 tasks.py 
from celery import Celery
import timeapp = Celery('tasks', broker='redis://10.18.18.96:6379/0',backend='redis://10.18.18.96:6379/1') # broker 任务队列;backend 结果存储数据库@app.task
def time_sleep(n):time.sleep(n) return f'延时{n}s函数'
3.1.2 分发应用到dp95、dp97 机器相同位置
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp95:~/celery
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp97:~/celery

3.2 启动worker服务

3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
# 以下是Celery命令入口点的选项解释:
-A, --app APPLICATION: 指定Celery应用的模块名或路径。
-b, --broker TEXT: 指定消息代理的地址,例如RabbitMQ或Redis。
--result-backend TEXT: 指定任务结果的后端存储地址。
--loader TEXT: 指定Celery加载器的类型。
--config TEXT: 指定配置文件的路径。
--workdir PATH: 指定Celery工作目录。
-C, --no-color: 禁用彩色输出。
-q, --quiet: 静默模式,减少输出。
--version: 显示Celery版本号。
--skip-checks: 跳过配置检查。
# tasks 是我们任务所在的文件名,workdir tasks.py任务文件所在目录 worker 表示启动的是 worker 程序
(airflow)[dp96 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info(airflow)[dp95 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info
(airflow)[dp97 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果

在这里插入图片描述
在这里插入图片描述

3.3 执行分布式异步任务

3.3.1 在dp96另一个shell界面,在task.py文件同目录下,创建test.py文件发布任务

from tasks import time_sleep
import timedef test():start = time.time()res = []for i in range(10):res_ = time_sleep.delay(5)  # 发布任务到broker (redis),正常同步任务执行时间10*5=50sstop = time.time()print(f'运行时间:{stop-start}')

在这里插入图片描述

3.4 查看异步任务结果

dp95:
在这里插入图片描述

dp96:

在这里插入图片描述

dp97:

在这里插入图片描述

如图所示,3台机器分别从队列中取出了3、4、3个睡眠5秒的任务,并在各自机器上并行(异步)运行,3台机器累计运行任务共花费5s

redis 后台结果数据库:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/323493.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

骑出好身材,女士专属,这项运动让你健康美丽两不误。

在繁忙的生活节奏中,寻找一项既能放松心情又能塑形美体的运动,成为了现代女性的新追求。骑行,这项绿色低碳的运动方式,正以其独特的魅力,成为女士们的新宠。它不仅能够带你穿梭于城市的喧嚣与自然的宁静之间&#xff0…

C++面向对象程序设计 - 虚函数

在C中,虑函数(Virtual Function)是面向对象编程(OOP)中的一个重要概念,它允许派生类(或称为子类)覆盖基类(或称为父类)中的成员函数。当通过基类指针或引用调…

遥控挖掘机之ESP8266调试心得(1)

ESP8266调试心得 1. 前言2.遇到的问题2.1 ESP8266模块建立TCP连接时候报错2.2 指令异常问题 3. 更新ESP8266固件3. ESP8266的部分AT指令3. 连接步骤3.1 模块与电脑连接3.2.1 电脑上的设置3.2.2 ESP8266模块作为客户机(TCP Cilent)的设置步骤 3.2 模块与模…

电脑硬盘故障,这5种情况要了解!

在数字化时代,电脑硬盘作为存储数据的重要设备,其稳定性和安全性直接关系到用户的数据安全和工作效率。然而,硬盘故障却是一个无法完全避免的问题。为什么会出现电脑硬盘故障?出现该问题时应该如何解决?一文带你弄懂答…

k8s部署最新版zookeeper集群(3.9.2),并配置prometheus监控

目录 zookeeper集群部署创建zookeeper文件夹namespace.yamlscripts-configmap.yamlserviceaccount.yamlstatefulset.yamlsvc-headless.yamlsvc.yamlmetrics-svc.yaml执行部署 接入prometheus访问prometheus查看接入情况导入zookeeper监控模版监控展示 zookeeper集群部署 复制粘…

Linux 操作系统TCP、UDP

1、TCP服务器编写流程 头文件&#xff1a; #include <sys/socket.h> 1.1 创建套接字 函数原型&#xff1a; int socket(int domain, int type, int protocol); 参数&#xff1a; domain: 网域 AF_INET &#xff1a; IPv4 AF_INET6 &a…

第十五届蓝桥杯省赛大学B组(c++)

很幸运拿了辽宁赛区的省一,进入6月1号的国赛啦... 这篇文章主要对第十五届省赛大学B组(C)进行一次完整的复盘,这次省赛2道填空题6道编程题: A.握手问题 把握手情景看成矩阵: 粉色部分是7个不能互相捂手的情况 由于每个人只能和其他人捂手, 所以黑色情况是不算的 1和2握手2和…

Vue+OpenLayers7入门到实战:OpenLayers解析通过fetch请求的GeoJson格式数据,并叠加要素文字标注,以行政区划边界为例

返回《Vue+OpenLayers7》专栏目录:Vue+OpenLayers7入门到实战 前言 本章介绍如何使用OpenLayers7在地图上通过fetch请求geojson数据,然后通过OpenLayers解析为Feature要素叠加到图层上,并且通过动态设置标注方式显示要素属性为文字标注。 本章还是以行政区划边界为例,这个…

大模型LLM之SFT微调总结

一. SFT微调是什么 在大模型的加持下现有的语义理解系统的效果有一个质的飞跃&#xff1b;相对于之前的有监督的Pre-Train模型&#xff1b;大模型在某些特定的任务中碾压式的超过传统nlp效果&#xff1b;由于常见的大模型参数量巨大&#xff1b;在实际工作中很难直接对大模型训…

游戏陪玩平台app小程序H5源码交付游戏陪玩接单软件游戏陪玩源码 陪玩小程序陪玩工作室运营模式陪玩管理系统游戏陪玩工作室怎么做

提供陪玩平台源码&#xff0c;陪玩系统源码&#xff0c;陪玩app源码&#xff0c;团队各部门配备齐全&#xff0c;分工明确&#xff0c;及时对接开发进度&#xff0c;保证开发效率 一、陪玩平台源码的功能介绍 1、派单大厅:陪玩系统源码的派单大厅内支持用户通过语音连麦的方式…

Vue.js-----vue组件

能够说出vue生命周期能够掌握axios的使用能够了解$refs, $nextTick作用能够完成购物车案例 Vue 生命周期讲解 1.钩子函数 目标&#xff1a;Vue 框架内置函数&#xff0c;随着组件的生命周期阶段&#xff0c;自动执行 作用: 特定的时间点&#xff0c;执行特定的操作场景: 组…

硬性清空缓存的方法

前端发布代码后&#xff0c;我们是需要刷新页面再验证的。有时候仅仅f5 或者ctrlshiftdelete快捷键仍然有历史缓存&#xff0c;这时可以通过下面的方法硬性清空缓存。 以谷歌浏览器为例&#xff0c;打开f12&#xff0c;右键点击刷新按钮&#xff0c;选择【清空缓存并硬性加载】…

Windows只能安装在GPT磁盘上

转换磁盘分区形式 步骤1. 先按照正常流程使用Windows系统安装光盘或系统U盘引导计算机。 步骤2. 在Windows安装程序中点击“开始安装”&#xff0c;然后按ShiftF10打开命令提示符。 步骤3. 依次输入以下命令&#xff0c;并在每一行命令后按一次Enter键执行。 步骤4. 等待转换…

C++ | Leetcode C++题解之第78题子集

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> t;vector<vector<int>> ans;void dfs(int cur, vector<int>& nums) {if (cur nums.size()) {ans.push_back(t);return;}t.push_back(nums[cur]);dfs(cur 1, nums);t.po…

自动驾驶学习2-毫米波雷达

1、简介 1.1 频段 毫米波波长短、频段宽,比较容易实现窄波束,雷达分辨率高,不易受干扰。波长介于1~10mm的电磁波,频率大致范围是30GHz~300GHz 毫米波雷达是测量被测物体相对距离、相对速度、方位的高精度传感器。 车载毫米波雷达主要有24GHz、60GHz、77GHz、79GHz四个频段。 …

深度学习基础之《TensorFlow框架(17)—卷积神经网络》

一、卷积神经网络介绍 1、背景 随着人工智能需求的提升&#xff0c;我们想要做复杂的图像识别&#xff0c;做自然语言处理&#xff0c;做语义分析翻译等等&#xff0c;多层神经网络的简单叠加显然力不从心 2、卷积神经网络与传统多层神经网络对比 &#xff08;1&#xff09;传…

实战 | 18行代码轻松实现人脸实时检测【附完整代码与源码详解】Opencv、人脸检测

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

【图像识别】Swin Transformer

一、引言 论文&#xff1a; Swin Transformer: Hierarchical Vision Transformer using Shifted Windows 作者&#xff1a; Microsoft Research Asia 代码&#xff1a; Swin Transformer 特点&#xff1a; 提出滑动窗口自注意力 (Shifted Window based Self-Attention) 解决Vi…

【3D基础】坐标转换——地理坐标投影到平面

汤国安版GIS原理第二章重点 1.常见投影方式 https://download.csdn.net/blog/column/9283203/83387473 Web Mercator投影&#xff08;Web Mercator Projection&#xff09;&#xff1a; 优点&#xff1a; 在 Web 地图中广泛使用&#xff0c;易于显示并与在线地图服务集成。在…

java.net.SocketInputStream.socketRead0 卡死导致 tomcat 线程池打满的问题

0 TL;DR; 问题与原因&#xff1a;某些特定条件下 java.net.SocketInputStream.socketRead0 方法会卡死&#xff0c;导致运行线程一直被占用导致泄露采用的方案&#xff1a;使用监控线程异步监控卡死事件&#xff0c;如果发生直接关闭网络连接释放链接以及对应的线程 1. 问题 …