-
- 1.config
-
- 1.1 通过app.conf进行配置
- 1.2 通过app.conf.update进行配置
- 1.3 通过配置文件进行配置
- 1.4 通过配置类的方式进行配置
- 2.任务相关
-
- 2.1 任务基类(base)
- 2.2 任务名称(name)
- 2.3 任务请求(request)
- 2.4 任务重试(retry)
-
- 2.4.1 指定最大重试次数
- 2.4.2 设置重试间隔时间
- 2.4.3 最大重试次数时报告的自定义异常
- 2.4.4 重试指定异常
- 3.序列化
- 4.后台操作worker和启动多个worker
- 5.和django集成
1.config
默认配置文件在源码中的celery.app.defaults中,可以看到各种的配置
目录结构
study- proj # 包- __init__.py # 必须含有__init__.py文件- celery.py # 实例化Celery,配置broker和backend相关- tasks.py # 任务- add_task1.py # 添加任务- config.py # 配置相关
proj/celery.py
from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
proj/tasks.py
from .celery import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y
add_task1.py
from proj.tasks import add, mul# 执行异步任务
result = add.delay(1, 2)
result1 = mul.delay(8, 2)
print(result)
print(result1)
1.1 通过app.conf进行配置
# 在proj/celery.py下进行编辑app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
1.2 通过app.conf.update进行配置
# 在proj/celery.py下进行编辑app.conf.update(enable_utc=False,timezone='Asia/Shanghai',
)
1.3 通过配置文件进行配置
# 在config.py中添加以下内容
timezone = 'Asia/Shanghai'
enable_utc = False# 在proj/celery.py添加以下内容
app.config_from_object('config')# 可以在控制台输入python -m config查看配置是否正确
1.4 通过配置类的方式进行配置
# 在config.py中添加以下内容
class Config:timezone = 'Asia/Shanghai'enable_utc = False# 在proj/celery.py添加以下内容
from config import Configapp.config_from_object(Config)
2.任务相关
目录结构
study- proj # 包- __init__.py # 必须含有__init__.py文件- celery.py # 实例化Celery,配置broker和backend相关- tasks.py # 任务- add_task1.py # 添加任务- config.py # 配置相关
proj/celery.py
from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
proj/tasks.py
from .celery import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y
add_task1.py
from proj.tasks import add, mul# 执行异步任务
result = add.delay(1, 2)
result1 = mul.delay(8, 2)
print(result)
print(result1)
2.1 任务基类(base)
proj/tasks.py
# 写一个类通过继承celery.Task完成任务基类
# 重写指定方法实现执行成功、失败、重试打印对应信息
import celery
from .celery import appclass CeleryTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo):""":param exc:任务引发的异常:param task_id:失败任务的唯一id:param args:失败任务的原始参数:param kwargs:失败任务的原始关键字参数:param einfo:异常信息"""print(f'{task_id} 失败,异常为: {exc}')def on_retry(self, exc, task_id, args, kwargs, einfo):""":param exc:任务引发的异常:param task_id:失败任务的唯一id:param args:失败任务的原始参数:param kwargs:失败任务的原始关键字参数:param einfo:异常信息"""print(f'{task_id} 重试异常为: {exc}')def on_success(self, retval, task_id, args, kwargs):""":param retval: 任务的返回值。:param task_id: 已执行任务的唯一id。:param args: 已执行任务的原始参数。:param kwargs: 已执行任务的原始关键字参数。:return: """print(f'{task_id} 成功,结果为: {retval}')# 通过装饰器传参将写的类传入
@app.task(base=CeleryTask)
def add(x, y):return x + y# 通过装饰器传参将写的类传入
@app.task(base=CeleryTask)
def mul(x, y):return x * y
启动worker
# linux启动
celery -A proj worker -l info
# windows启动
celery -A proj worker -l info -P eventlet# 启动worker后添加任务,运行python add_task1.py(需要cd到add_task1所在的目录下)---console----------------- celery@DESKTOP-IPV07D4 v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19041-SP0 2022-04-07 14:22:19
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: proj.celery:0x1de2ae29688
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. proj.tasks.add. proj.tasks.mul[2022-04-07 14:22:19,329: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2022-04-07 14:22:19,331: INFO/MainProcess] mingle: searching for neighbors
[2022-04-07 14:22:20,340: INFO/MainProcess] mingle: all alone
[2022-04-07 14:22:20,345: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
[2022-04-07 14:22:20,349: INFO/MainProcess] celery@DESKTOP-IPV07D4 ready.
[2022-04-07 14:22:37,055: INFO/MainProcess] Task proj.tasks.add[ee9576c2-4db6-4781-b98f-54bd50f2fb71] received
[2022-04-07 14:22:37,058: INFO/MainProcess] Task proj.tasks.mul[9a0dd6be-13fb-4093-a332-ee6675a24ebf] received
[2022-04-07 14:22:37,061: WARNING/MainProcess] ee9576c2-4db6-4781-b98f-54bd50f2fb71 成功,结果为: 3
[2022-04-07 14:22:37,061: INFO/MainProcess] Task proj.tasks.add[ee9576c2-4db6-4781-b98f-54bd50f2fb71] succeeded in 0.0s: 3
[2022-04-07 14:22:37,062: WARNING/MainProcess] 9a0dd6be-13fb-4093-a332-ee6675a24ebf 成功,结果为: 16
[2022-04-07 14:22:37,062: INFO/MainProcess] Task proj.tasks.mul[9a0dd6be-13fb-4093-a332-ee6675a24ebf] succeeded in 0.0s: 16
2.2 任务名称(name)
目录结构
study- proj # 包- __init__.py # 必须含有__init__.py文件- celery.py # 实例化Celery,配置broker和backend相关- tasks.py # 任务- add_task1.py # 添加任务- config.py # 配置相关
proj/celery.py
from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
proj/tasks.py
from .celery import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y
add_task1.py
from proj.tasks import add, mul# 执行异步任务
result = add.delay(1, 2)
result1 = mul.delay(8, 2)
print(result)
print(result1)
每一个任务都必须有一个唯一任务名称。如果没有指定任务名称,装饰器会根据当前任务所在的模块以及任务函数的名称进行生成一个,例如现在启动workercelery -A proj worker -l info -P eventlet
后,控制台中可以看到任务名称,如下图
如果任务过多,可以通过自己命名方式进行命名
tasks.py
from .celery import app@app.task(name='add')
def add(x, y):return x + y@app.task(name='mul')
def mul(x, y):return x * y
此时启动worker,可以看到名称已修改成对应的名称,运行任务处的名称同样已修改。
2.3 任务请求(request)
目录结构
study- proj # 包- __init__.py # 必须含有__init__.py文件- celery.py # 实例化Celery,配置broker和backend相关- tasks.py # 任务- add_task1.py # 添加任务- config.py # 配置相关
proj/celery.py
from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
proj/tasks.py
from .celery import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y
request包含与当前执行任务相关的信息和状态
tasks.py
# bind 参数表示该函数绑是一个绑定方法,可以通过访问任务类型实例中的属性和方法
@app.task(bind=True)
def add(self, x, y):print(self.request)return x + y
# 启动worker后,将任务添加,可以看到worker处打印下方信息---console---
<Context: {'lang': 'py', 'task': 'proj.tasks.add', 'id': '7b347ce7-18c8-4afb-be37-8063503780a0', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None]
, 'root_id': '7b347ce7-18c8-4afb-be37-8063503780a0', 'parent_id': None, 'argsrepr': '(1, 2)', 'kwargsrepr': '{}', 'origin': 'gen15340@DESKTOP-IPV07D4', 'ignore_result': False, 'properties': {'correlation_id': '7b347ce7-18c8-4afb-be37-8063503780a0', 'reply_to': '76f
85df8-e5b9-37cc-8d03-66b19e61ebee', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'df94e00a-033a-4f29-8ab0-c4fd3f33326c'}, 'reply_to': '76f85df8-e5b9-37cc-8d03-66b19e61ebee'
, 'correlation_id': '7b347ce7-18c8-4afb-be37-8063503780a0', 'hostname': 'celery@DESKTOP-IPV07D4', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [1, 2], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'er
rbacks': None, 'chain': None, 'chord': None, 'called_directly': False, '_protected': 1}>
2.4 任务重试(retry)
当任务执行出错后,自动重试任务,可指定发生指定异常重试,设置重试次数,设置重试间隔时间等
目录结构
study- proj # 包- __init__.py # 必须含有__init__.py文件- celery.py # 实例化Celery,配置broker和backend相关- tasks.py # 任务- add_task1.py # 添加任务- config.py # 配置相关
proj/celery.py
from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
proj/tasks.py
from .celery import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y
add_task1.py
from proj.tasks import add, mul# 执行异步任务
result = add.delay(1, 2)
print(result)
在tasks.py中修改代码
加粗样式
@app.task(bind=True)
def add(self, x, y):try:z = x + yreturn zexcept Exception as e:raise self.retry()
修改add_task1.py让执行任务时发生异常
result = add.delay(['1'], 2)
print(result)
- 启动worker
- 添加任务
此时观察控制台
---console----------------- celery@DESKTOP-IPV07D4 v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.19041-SP0 2022-04-07 16:14:45
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: proj.celery:0x13f283ea5c8
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. proj.tasks.add. proj.tasks.mul[2022-04-07 16:14:45,885: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2022-04-07 16:14:45,887: INFO/MainProcess] mingle: searching for neighbors
[2022-04-07 16:14:46,900: INFO/MainProcess] mingle: all alone
[2022-04-07 16:14:46,907: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
[2022-04-07 16:14:46,909: INFO/MainProcess] celery@DESKTOP-IPV07D4 ready.
[2022-04-07 16:17:31,285: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] received
[2022-04-07 16:17:31,290: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] received
[2022-04-07 16:17:31,292: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] retry: Retry in 3s
[2022-04-07 16:20:31,302: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] received
[2022-04-07 16:20:31,305: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] retry: Retry in 3s
[2022-04-07 16:23:31,314: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] received
[2022-04-07 16:23:31,317: INFO/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] retry: Retry in 3s
[2022-04-07 16:26:31,312: ERROR/MainProcess] Task proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] raised unexpected: MaxRetriesExceededError("Can't retry proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] args:(['1'], 2) kwargs:{}")
Traceback (most recent call last):File "C:\Users\Dell\Desktop\study_celery\test\proj\tasks.py", line 49, in addz = x + y
TypeError: can only concatenate list (not "int") to listDuring handling of the above exception, another exception occurred:Traceback (most recent call last):File "D:\Virtualenvs\celery\lib\site-packages\celery\app\trace.py", line 451, in trace_taskR = retval = fun(*args, **kwargs)File "D:\Virtualenvs\celery\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__return self.run(*args, **kwargs)File "C:\Users\Dell\Desktop\study_celery\test\proj\tasks.py", line 53, in addraise self.retry()File "D:\Virtualenvs\celery\lib\site-packages\celery\app\task.py", line 721, in retry), task_args=S.args, task_kwargs=S.kwargs
celery.exceptions.MaxRetriesExceededError: Can't retry proj.tasks.add[268b203a-1291-4e24-854f-686edf9a09f3] args:(['1'], 2) kwargs:{}
发现每次重试间隔180s,默认为180s,重试三次后触发MaxRetriesExceededError,默认重试三次
2.4.1 指定最大重试次数
@app.task(bind=True)
def add(self, x, y):try:z = x + yreturn zexcept Exception as e:raise self.retry(max_retries=4)
通过max_retries参数指定最大重试次数,重试指定次数后如果还有异常会触发MaxRetriesExceededError
2.4.2 设置重试间隔时间
@app.task(bind=True, default_retry_delay=3)
def add(self, x, y):try:z = x + yreturn zexcept Exception as e:raise self.retry(max_retries=4)# raise self.retry(max_retries=4,countdown=6)
通过此参数default_retry_delay设置重试间隔,单位为秒,默认为3分钟;也可以通过retry中的countdown参数进行设置,countdown参数会覆盖default_retry_delay,二者用其一即可
2.4.3 最大重试次数时报告的自定义异常
@app.task(bind=True, default_retry_delay=3)
def add(self, x, y):try:z = x + yreturn zexcept Exception as e:raise self.retry(exc=e, max_retries=4)
设置exc参数,执行指定重试次数,如果还发生异常,将发生的异常为指定的异常
---console---
[2022-04-07 16:35:48,697: ERROR/MainProcess] Task proj.tasks.add[6e872f2d-3aa4-4873-87f1-28a86abdac90] raised unexpected: TypeError('can only concatenate list (not "int") to list')
Traceback (most recent call last):File "D:\Virtualenvs\celery\lib\site-packages\celery\app\trace.py", line 451, in trace_taskR = retval = fun(*args, **kwargs)File "D:\Virtualenvs\celery\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__return self.run(*args, **kwargs)File "C:\Users\Dell\Desktop\study_celery\test\proj\tasks.py", line 53, in addraise self.retry(exc=e, max_retries=4)File "D:\Virtualenvs\celery\lib\site-packages\celery\app\task.py", line 717, in retryraise_with_context(exc)File "C:\Users\Dell\Desktop\study_celery\test\proj\tasks.py", line 49, in addz = x + y
TypeError: can only concatenate list (not "int") to list
2.4.4 重试指定异常
# 当发生TypeError异常时,会重试5次,每次重试时间为60s,不发生TypeError异常终止
@app.task(autoretry_for=(TypeError,), retry_kwargs={'max_retries': 5, 'countdown': 60})
def add(x, y):return x + y
3.序列化
标识需要使用默认序列化的字符串。默认为 json,也可以为 pickle、json、yaml 或者通过 kombu.serialization.registry注册的自定义序列化方法。
config.py
app.conf.update(task_serializer='json',accept_content=['json'], result_serializer='json',
)
4.后台操作worker和启动多个worker
# 好像不支持windows后台运行worker,在linux尝试成功了
# 以下为linux中使用# start为启动,对应还有stop,restart
# w1为别名,当别名为数字的时候会启动多个worker# 启动一个名称为w1的后台worker任务
celery multi start w1 -A proj -l info# 启动10个后台worker任务,默认名称为celery1····10
celery multi start 10 -A proj -l info
默认情况下会在当前目录中创建pid文件和日志文件,为防止多个worker干扰,建议将这些文件存放在专门的目录中,默认pid存放在/var/run/celery下,log存放在/var/log/celery/下,也可以自己进行指定
celery multi start w1 -A proj -l info --pidfile=./pid/%n.pid --logfile=./log/%n.log
5.和django集成
目录结构
# 在项目根路径下创建celery_task
celery_task__init__.pycelery.py tasks.py
celery.py
import os
import django
from celery import Celery# 一定要加载django的环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'phoneapi.settings.dev')
django.setup()broker = 'redis://127.0.0.1:6379/1' # 任务储存
backend = 'redis://127.0.0.1:6379/2' # 结果存储,执行完之后结果放在这app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])# 指令
# celery beat -A celery_task -l info
# 任务的定时配置
from datetime import timedeltaapp.conf.beat_schedule = {'low-task': {'task': 'celery_task.tasks.get_all_user','schedule': timedelta(seconds=30),}
}
tasks.py
from .celery import app@app.task
def get_all_user():# 一定要在此处导入django中的操作,避免循环导入问题from home import modelsyour code write here
启动
celery beat -A celery_task -l info# windows
celery worker -A celery_task -l info -P eventlet# linux
celery worker -A celery_task -l info