celery学习记录
[TOC]
基本概念
Celery 是一个分布式任务队列,用于处理实时任务。这些任务可以在分布式系统中的多个 worker 上并发执行,使其非常适合于处理需要高并发和异步任务的应用程序,如 Web 服务、数据处理任务等。以下是 Celery 的基本介绍和使用指南:
Celery 的基本概念
任务(Task): Celery 的基本单位,是需要执行的函数或方法。任务可以是Python函数,经过装饰器转换后可以被 Celery 调度执行
队列(Queue): 使用消息队列来分发任务。常用的消息代理包括 RabbitMQ 和 Redis
Worker: Celery 的 worker 是负责执行队列中任务的进程。可以在多台机器上运行多个 worker,以提高并发能力
Broker(消息代理): 负责传递任务消息,是 Celery 和 worker 之间的中介。常用的 broker 有 Redis 和 RabbitMQ
Backend(结果存储): 可选组件,用于存储任务结果,以便查询任务执行结果
安装和配置
安装 Celery:
使用 pip 安装 Celery 和选择的 broker(例如 Redis):1
2
3
4
5
6
7
8# 安装celery
pip install celery[redis]==5.4.0
# 安装监控工具
pip install flower==2.0.1
# 设置环境变量
export C_FORCE_ROOT=1配置 Celery:
用下面的docker-compose.yaml,可以快速启动一个
redis
和rabbitmq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35version: '3.3' # Docker Compose 文件版本
services:
redis:
image: redis:latest # 使用官方 Redis 镜像
container_name: redis # 容器名称
ports:
- "6379:6379" # 暴露 Redis 的默认端口
networks:
- backend_network # 指定网络
volumes:
- redis_data:/data # 持久化 Redis 数据
rabbitmq:
image: rabbitmq:management # 使用带管理界面的官方 RabbitMQ 镜像
container_name: rabbitmq # 容器名称
ports:
- "5672:5672" # RabbitMQ 的默认通信端口
- "15672:15672" # RabbitMQ 管理界面端口
networks:
- backend_network # 指定网络
environment:
RABBITMQ_DEFAULT_USER: admin # 设置默认用户名
RABBITMQ_DEFAULT_PASS: admin # 设置默认密码
volumes:
- rabbitmq_data:/var/lib/rabbitmq # 持久化 RabbitMQ 数据
networks:
backend_network:
driver: bridge # 使用桥接网络
volumes:
redis_data:
driver: local
rabbitmq_data:
driver: local创建一个celery配置文件
celery_config.py
:1
2
3
4
5
6
7
8
9
10
11
12
13
14# 使用 RabbitMQ 作为消息代理
broker_url = "amqp://admin:admin@192.168.123.22:5672//"
# 使用 Redis 存储任务结果
result_backend = "redis://192.168.123.22:6379/0"
# 将任务序列化器和结果序列化器改为 Pickle,也支持json格式
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = ['json', 'pickle']
# 时区
timezone = "Asia/Shanghai"
enable_utc = True
基本使用
定义任务:
使用@app.task
装饰器将一个函数转换为 Celery 任务,以下文件命名为tasks.py
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30from celery import Celery
# 创建 Celery 应用实例
app = Celery('tasks')
# 加载配置
app.config_from_object('celery_config')
class Res:
def __init__(self, res):
self.res = res
class Input:
def __init__(self, a, b):
self.a = a
self.b = b
# 定义任务
def inp(inp: Input):
return 2
# 定义任务
def add(x, y):
return x + y
def multiply(x, y):
return x * y启动 Worker:
使用 Celery CLI 启动 worker,准备处理任务,这里-A celery_app
指定了 Celery 应用实例的模块:1
2
3
4
5
6
7
8
9celery -A tasks celery_app --pool=prefork --loglevel=info -c 16 --max-tasks-per-child=1000
# 参数解释
-A tasks: 指定 Celery 应用所在的模块为 tasks
worker: 启动 Celery worker 进程
--pool=prefork: 使用 prefork 并发模型(多进程)
--loglevel=info: 设置日志级别为 info,用于输出信息级日志
-c 16: 并发工作进程数量为 16
--max-tasks-per-child=1000: 每个工作进程处理 1000 个任务后重启,以管理内存和稳定性根据你的应用场景选择合适的池:
- CPU 密集型任务: 推荐使用 prefork
- I/O 密集型任务: 推荐使用 eventlet 或 gevent
- 调试和简单测试: 使用 solo
- 需要共享内存的场景: 使用 threads
发送任务:
在 Python 环境中或应用程序中调用任务,定义app.py
文件如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64#!/usr/bin/env python
# -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: app.py
@description: 任务管理和执行示例
@time: 2025/1/27 20:30
"""
import time
from basic_support.comm_funcs.utils_func import time_this_function
from basic_support.logger.logger_config import logger
from celery import group
from celery.result import AsyncResult
from tasks import add, multiply, inp, Input, app
def get_task_res(task_id: str):
"""获取任务结果并检查状态"""
result = AsyncResult(task_id, app=app)
if result.ready():
logger.info(f"任务已完成,结果为: {result.get()}")
else:
logger.info("任务尚未完成")
def run():
"""异步任务执行示例"""
task_ids = []
for i in range(1000):
result = inp.delay(Input(4, i))
task_ids.append(result.id)
logger.info(f"Task ID: {result.id}")
result2 = multiply.delay(3, 7)
task_ids.append(result2.id)
for idx, task_id in enumerate(task_ids):
result = AsyncResult(task_id, app=app)
logger.info(f"idx: {idx}, result is: {result.get()}")
def run_group():
"""使用 group 将任务组合并发执行"""
tasks = [inp.s(Input(4, i)) for i in range(1000)]
tasks.append(multiply.s(3, 7))
half_task_num = len(tasks) // 2
job = group(tasks[:half_task_num])
job2 = group(tasks[half_task_num:])
results = job.apply_async().get()
result2 = job2.apply_async().get()
for idx, res in enumerate(results):
logger.info(f"idx: {idx}, result is: {res}")
for idx, res in enumerate(result2):
logger.info(f"idx: {idx}, result is: {res}")
if __name__ == '__main__':
run()
run_group()result.get()
用于获取任务的返回结果
高级功能
定时任务:Celery 可以与
celery beat
配合使用,定时调度任务,在app.py
中加入:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48from celery.schedules import crontab
app.conf.beat_schedule = {
# 定义一个名为 'add-every-30-seconds' 的周期性任务
'add-every-30-seconds': {
# 指定要执行的任务名称(路径)
'task': 'celery_app.add',
# 设置任务的执行周期为 30 秒
'schedule': 30.0,
# 传递给任务的参数
'args': (16, 16),
},
}
# 更多示例任务配置
app.conf.beat_schedule.update({
# 每天午夜执行清理任务
'cleanup-every-midnight': {
'task': 'celery_app.cleanup',
# 使用 crontab 表达式设置为每天午夜执行
'schedule': crontab(hour=0, minute=0),
'args': (),
},
# 每小时执行一次数据备份
'backup-every-hour': {
'task': 'celery_app.backup',
# 定时器设置为每小时执行
'schedule': crontab(minute=0),
'args': ('database_name',),
},
# 每周一早上8点发送报告
'send-report-every-monday': {
'task': 'celery_app.send_report',
# 使用 crontab 设置每周一早上8点执行
'schedule': crontab(hour=8, minute=0, day_of_week=1),
'args': ('weekly_report',),
},
# 每隔5分钟检查系统状态
'check-system-status-every-5-minutes': {
'task': 'celery_app.check_status',
# 每5分钟执行一次
'schedule': crontab(minute='*/5'),
'args': (),
},
})错误处理:
Celery 提供了任务重试机制和错误处理选项,可以在任务定义中配置:1
2
3
4
5
6
7
def fail_task(self):
try:
# 可能失败的操作
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=60)结果存储:
配置backend
后,可以存储任务执行结果,支持 Redis、数据库等多种选项
总结
Celery 是一个强大而灵活的工具,适用于构建分布式、高并发和异步任务处理的应用程序
在使用 Celery 时,选择合适的 broker 和 backend 以及合理配置 worker 数量和并发模式,可以大大提高程序的性能和可靠性
在实际应用中,可以根据需要扩展和使用 Celery 提供的各种高级功能,如定时任务、任务链、工作流等