Python消息队列教程_Celery异步任务实践

Celery 是 Python 最成熟分布式异步任务队列框架,核心是可靠发送任务、稳定执行与清晰反馈结果;选 Broker 时,Redis 适合开发测试,RabbitMQ 更适企业级生产;任务需可序列化、无状态、参数精简;启动 worker 和调用任务须分离,避免同步执行;定时任务应使用 Celery Beat 而非 crontab。

Celery 是 Python 生态中最成熟、最常用的分布式异步任务队列框架。它不负责“造轮子”,而是专注做好三件事:把任务可靠地发出去、让 worker 稳定地接住并执行、把结果或状态清晰地反馈回来。用好 Celery 的关键,不是堆功能,而是理解它的运行逻辑和常见陷阱。

消息中间件怎么选?Redis 和 RabbitMQ 各有什么侧重

Broker 是 Celery 的通信枢纽,选错会影响稳定性、扩展性和运维成本。

  • Redis:上手快、轻量、支持发布/订阅和简单队列,适合中小项目或开发测试;但原生不支持消息持久化(需手动配置 RDB/AOF)、无优先级队列、集群模式下故障转移较弱
  • RabbitMQ:企业级首选,天然支持消息确认、死信队列、优先级、延迟插件、多租户和高可用集群;配置稍复杂,但长期看更省心,尤其在金融、订单类场景中优势明显
  • 小建议:本地开发用 Redis(pip install redis),上线前评估是否切换 RabbitMQ;避免用 SQLite 或文件系统做 broker —— 它们不满足并发与可靠性要求

任务定义要“干净”:可序列化、无状态、参数精简

任务函数被发送到 worker 进程执行,跨进程意味着不能传对象引用、闭包或数据库连接。

  • 只用基础类型传参:int、str、dict、list、bool,避免 datetime 对象(改用 ISO 格式字符串)或自定义类实例
  • 不要在任务里直接用 requestcurrent_app 或 Django 的 user 对象;应传 ID,进任务后再查库重建上下文
  • @app.task(bind=True, retry=True, autoretry_for=(ConnectionError,), max_retries=3) 显式控制重试行为,比默认策略更可控
  • 设置超时:用 soft_time_limit=30 防卡死,time_limit=45 强制终止(单位秒)

启动与调用:两步走清,别混在一起

很多人卡在“写了任务却没反应”,问题常出在启动方式或调用姿势不对。

  • 启动 worker:在 tasks.py 所在目录运行 celery -A tasks worker --loglevel=info;注意 -A 后跟的是模块名(不含 .py),不是文件路径
  • 触发任务:用 .delay() 发送异步任务(推荐),或 .apply_async(args=[1,2], countdown=60) 延迟 60 秒执行
  • 获取结果:仅当配置了 result_backend(如 backend='redis://localhost:6379/1')后,result.get(timeout=10) 才有效;否则会一直阻塞或报错
  • 常见误区:在 Flask/Django 视图里直接调用 add(4, 6)(同步执行)—— 这完全绕过了 Celery,起不到异步作用

定时任务别靠 crontab 模拟,用 Celery Beat 管理更稳

Celery Beat 是独立调度进程,不是 crontab 的 Python 封装,它必须和 worker 共用同一套 broker 和 backend。

  • celeryconfig.py 中配置:beat_schedule = {'daily-report': {'task': 'tasks.send_report', 'schedule': 3600}}(每小时一次)
  • 启动命令:celery -A tasks beat --loglevel=info;生产环境建议搭配 supervisord 或 systemd 管理进程生命周期
  • 动态周期任务(如运营后台可增删的推送计划)推荐用 django-celery-beat,它把调度规则存在数据库里,无需重启服务
  • 重要提醒:Beat 不执行任务,只发任务;如果 worker 挂了,任务会积压在 broker 中,等 worker 恢复后继续消费