[TOC]


Celery 中文手册

Celery配置及使用

一文读懂 Python 分布式任务队列 celery

celery 爬坑

基本概念

Celery 是一个分布式任务队列,用于处理实时任务。这些任务可以在分布式系统中的多个 worker 上并发执行,使其非常适合于处理需要高并发和异步任务的应用程序,如 Web 服务、数据处理任务等。以下是 Celery 的基本介绍和使用指南:

Celery 的基本概念

  1. 任务(Task): Celery 的基本单位,是需要执行的函数或方法。任务可以是Python函数,经过装饰器转换后可以被 Celery 调度执行

  2. 队列(Queue): 使用消息队列来分发任务。常用的消息代理包括 RabbitMQRedis

  3. Worker: Celery 的 worker 是负责执行队列中任务的进程。可以在多台机器上运行多个 worker,以提高并发能力

  4. Broker(消息代理): 负责传递任务消息,是 Celery 和 worker 之间的中介。常用的 broker 有 Redis 和 RabbitMQ

  5. Backend(结果存储): 可选组件,用于存储任务结果,以便查询任务执行结果

celery框架图

安装和配置

  1. 安装 Celery:
    使用 pip 安装 Celery 和选择的 broker(例如 Redis):

    1
    2
    3
    4
    5
    6
    7
    8
    # 安装celery
    pip install celery[redis]==5.4.0

    # 安装监控工具
    pip install flower==2.0.1

    # 设置环境变量
    export C_FORCE_ROOT=1
  2. 配置 Celery:

    用下面的docker-compose.yaml,可以快速启动一个redisrabbitmq

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    version: '3.3'  # Docker Compose 文件版本
    services:
    redis:
    image: redis:latest # 使用官方 Redis 镜像
    container_name: redis # 容器名称
    ports:
    - "6379:6379" # 暴露 Redis 的默认端口
    networks:
    - backend_network # 指定网络
    volumes:
    - redis_data:/data # 持久化 Redis 数据

    rabbitmq:
    image: rabbitmq:management # 使用带管理界面的官方 RabbitMQ 镜像
    container_name: rabbitmq # 容器名称
    ports:
    - "5672:5672" # RabbitMQ 的默认通信端口
    - "15672:15672" # RabbitMQ 管理界面端口
    networks:
    - backend_network # 指定网络
    environment:
    RABBITMQ_DEFAULT_USER: admin # 设置默认用户名
    RABBITMQ_DEFAULT_PASS: admin # 设置默认密码
    volumes:
    - rabbitmq_data:/var/lib/rabbitmq # 持久化 RabbitMQ 数据

    networks:
    backend_network:
    driver: bridge # 使用桥接网络

    volumes:
    redis_data:
    driver: local
    rabbitmq_data:
    driver: local

    创建一个celery配置文件celery_config.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # 使用 RabbitMQ 作为消息代理
    broker_url = "amqp://admin:admin@192.168.123.22:5672//"

    # 使用 Redis 存储任务结果
    result_backend = "redis://192.168.123.22:6379/0"

    # 将任务序列化器和结果序列化器改为 Pickle,也支持json格式
    task_serializer = 'pickle'
    result_serializer = 'pickle'
    accept_content = ['json', 'pickle']

    # 时区
    timezone = "Asia/Shanghai"
    enable_utc = True

基本使用

  1. 定义任务:
    使用 @app.task 装饰器将一个函数转换为 Celery 任务,以下文件命名为tasks.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    from celery import Celery

    # 创建 Celery 应用实例
    app = Celery('tasks')

    # 加载配置
    app.config_from_object('celery_config')

    class Res:
    def __init__(self, res):
    self.res = res

    class Input:
    def __init__(self, a, b):
    self.a = a
    self.b = b

    # 定义任务
    @app.task
    def inp(inp: Input):
    return 2

    # 定义任务
    @app.task
    def add(x, y):
    return x + y

    @app.task
    def multiply(x, y):
    return x * y
  2. 启动 Worker:
    使用 Celery CLI 启动 worker,准备处理任务,这里 -A celery_app 指定了 Celery 应用实例的模块:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    celery -A tasks celery_app --pool=prefork --loglevel=info -c 16 --max-tasks-per-child=1000

    # 参数解释
    -A tasks: 指定 Celery 应用所在的模块为 tasks
    worker: 启动 Celery worker 进程
    --pool=prefork: 使用 prefork 并发模型(多进程)
    --loglevel=info: 设置日志级别为 info,用于输出信息级日志
    -c 16: 并发工作进程数量为 16
    --max-tasks-per-child=1000: 每个工作进程处理 1000 个任务后重启,以管理内存和稳定性

    根据你的应用场景选择合适的池:

    • CPU 密集型任务: 推荐使用 prefork
    • I/O 密集型任务: 推荐使用 eventlet 或 gevent
    • 调试和简单测试: 使用 solo
    • 需要共享内存的场景: 使用 threads
  3. 发送任务:
    在 Python 环境中或应用程序中调用任务,定义app.py文件如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    #!/usr/bin/env python
    # -- coding: utf-8 --

    """
    @version: v1.0
    @author: huangyc
    @file: app.py
    @description: 任务管理和执行示例
    @time: 2025/1/27 20:30
    """

    import time
    from basic_support.comm_funcs.utils_func import time_this_function
    from basic_support.logger.logger_config import logger
    from celery import group
    from celery.result import AsyncResult
    from tasks import add, multiply, inp, Input, app

    def get_task_res(task_id: str):
    """获取任务结果并检查状态"""
    result = AsyncResult(task_id, app=app)
    if result.ready():
    logger.info(f"任务已完成,结果为: {result.get()}")
    else:
    logger.info("任务尚未完成")

    @time_this_function
    def run():
    """异步任务执行示例"""
    task_ids = []
    for i in range(1000):
    result = inp.delay(Input(4, i))
    task_ids.append(result.id)
    logger.info(f"Task ID: {result.id}")

    result2 = multiply.delay(3, 7)
    task_ids.append(result2.id)

    for idx, task_id in enumerate(task_ids):
    result = AsyncResult(task_id, app=app)
    logger.info(f"idx: {idx}, result is: {result.get()}")

    @time_this_function
    def run_group():
    """使用 group 将任务组合并发执行"""
    tasks = [inp.s(Input(4, i)) for i in range(1000)]
    tasks.append(multiply.s(3, 7))

    half_task_num = len(tasks) // 2
    job = group(tasks[:half_task_num])
    job2 = group(tasks[half_task_num:])

    results = job.apply_async().get()
    result2 = job2.apply_async().get()

    for idx, res in enumerate(results):
    logger.info(f"idx: {idx}, result is: {res}")

    for idx, res in enumerate(result2):
    logger.info(f"idx: {idx}, result is: {res}")

    if __name__ == '__main__':
    run()
    run_group()

    result.get() 用于获取任务的返回结果

高级功能

  1. 定时任务:Celery 可以与 celery beat 配合使用,定时调度任务,在 app.py 中加入:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    from celery.schedules import crontab

    app.conf.beat_schedule = {
    # 定义一个名为 'add-every-30-seconds' 的周期性任务
    'add-every-30-seconds': {
    # 指定要执行的任务名称(路径)
    'task': 'celery_app.add',
    # 设置任务的执行周期为 30 秒
    'schedule': 30.0,
    # 传递给任务的参数
    'args': (16, 16),
    },
    }

    # 更多示例任务配置
    app.conf.beat_schedule.update({
    # 每天午夜执行清理任务
    'cleanup-every-midnight': {
    'task': 'celery_app.cleanup',
    # 使用 crontab 表达式设置为每天午夜执行
    'schedule': crontab(hour=0, minute=0),
    'args': (),
    },

    # 每小时执行一次数据备份
    'backup-every-hour': {
    'task': 'celery_app.backup',
    # 定时器设置为每小时执行
    'schedule': crontab(minute=0),
    'args': ('database_name',),
    },

    # 每周一早上8点发送报告
    'send-report-every-monday': {
    'task': 'celery_app.send_report',
    # 使用 crontab 设置每周一早上8点执行
    'schedule': crontab(hour=8, minute=0, day_of_week=1),
    'args': ('weekly_report',),
    },

    # 每隔5分钟检查系统状态
    'check-system-status-every-5-minutes': {
    'task': 'celery_app.check_status',
    # 每5分钟执行一次
    'schedule': crontab(minute='*/5'),
    'args': (),
    },
    })
  2. 错误处理:
    Celery 提供了任务重试机制和错误处理选项,可以在任务定义中配置:

    1
    2
    3
    4
    5
    6
    7
    @app.task(bind=True, max_retries=3)
    def fail_task(self):
    try:
    # 可能失败的操作
    pass
    except Exception as exc:
    raise self.retry(exc=exc, countdown=60)
  3. 结果存储:
    配置 backend 后,可以存储任务执行结果,支持 Redis、数据库等多种选项

总结

Celery 是一个强大而灵活的工具,适用于构建分布式、高并发和异步任务处理的应用程序

在使用 Celery 时,选择合适的 broker 和 backend 以及合理配置 worker 数量和并发模式,可以大大提高程序的性能和可靠性

在实际应用中,可以根据需要扩展和使用 Celery 提供的各种高级功能,如定时任务、任务链、工作流等