一文粗通 Celery 分布式任务队列
目录
- 简介
- 什么是 Celery
- Celery 的基本组成
- Celery 的应用场景
- 快速开始
- 设置热重载开发脚本
- 基本任务管理
- 绑定任务本身
- 设置任务的执行超时时间
- 允许任务重试
- 自定义任务名称
- 实现任务优先级
- 高级任务管理
- 任务延迟执行
- 指定时间执行
- 任务超时自动取消
- 任务优先级
- 重试任务
- 任务链与工作流
- 简介
- 任务链 chain(顺序执行)
- 并发任务
- 并发完成后继续执行新任务
- 定时任务
- 快速开始
- 使用 crontab 配置
- 定时任务重试机制
- Worker 高级配置
- 并发数设置
- 任务预取
- 子进程自动回收
- 任务运行时间限制
- 链接
简介
什么是 Celery
Celery 是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行
它的核心功能是:
- 异步执行:把一些“慢任务”扔到后台去跑,不影响主程序的响应速度
- 定时任务:像闹钟一样,到点了自动触发执行任务
Celery 的基本组成
- Broker(消息中间件):任务的“中转站”,比如 RabbitMQ、Redis
- Worker(工作进程):真正执行任务的工人
- Task(任务):你定义好的需要异步执行的函数
- Result Backend(结果存储)(可选):存放任务的执行结果,比如也用 Redis、数据库等
Celery 的应用场景
- 发邮件(用户注册后异步发送邮件)
- 视频转码(上传完视频后后台慢慢处理)
- 周期性操作(比如每天凌晨清理数据库)
- 接口超时保护(主线程快速返回,慢处理放到后台)
快速开始
- 安装
uv init # 使用 uv 创建一个虚拟环境uv add celery # 安装 celery 库
如果你不知道 uv 是什么?你可以使用 pip 或者看一下这篇博客
uv 入门指南
- 简单示例
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y
- 启动 Celery
uv run celery -A tasks worker --loglevel=INFO
注意:你不能在 windows 电脑上启动 Celery,不支持
如果你看到类似一下的内容那么启动 成功
-------------- celery@MBP.local v5.5.1 (immunity)
--- ***** -----
-- ******* ---- macOS-13.5-arm64-arm-64bit-Mach-O 2025-04-20 10:46:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x106df8590
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. tasks.add[2025-04-20 10:46:23,109: INFO/MainProcess] Connected to redis://localhost:6379/0
[2025-04-20 10:46:23,111: INFO/MainProcess] mingle: searching for neighbors
[2025-04-20 10:46:24,120: INFO/MainProcess] mingle: all alone
[2025-04-20 10:46:24,135: INFO/MainProcess] celery@MBP.local ready.
最重要的一个标记是,你看add
函数被 Celery 托管了。
设置热重载开发脚本
先说答案:严格来说,Celery 没有“真正意义上的热启动”。但是,可以通过一些技巧,做到“快速重载、不中断任务处理”,接近热启动的体验
uv add watchdoguv run watchmedo auto-restart --pattern="*.py" --recursive -- celery -A tasks worker --loglevel=info
利用 watchdog 包,监听 python 文件的变化,一变化就重新启动 Celery
这只能在开发环境使用,请不要在生产环境使用
基本任务管理
绑定任务本身
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')
import logging_logger = logging.getLogger(__name__)@app.task(bind=True)
def add(self, x, y):_logger.info(f"Task ID: {self.request.id}")return x + y
如果加了 bind=True
,函数第一个参数就变成了 self
(代表任务对象),你可以在任务里调用 self.retry()
、self.request
这些高级功能。
设置任务的执行超时时间
from celery import Celeryapp = Celery('tasks', broker='redis://localhost:6379/0')
# app.config_from_object('celeryconfig')
import logging
import time_logger = logging.getLogger(__name__)@app.task(bind=True, time_limit=3)
def add(self, x, y):_logger.info(f"Task ID: {self.request.id}")time.sleep(5) # Simulate a long-running taskreturn x + y
允许任务重试
@app.task(autoretry_for=(ConnectionError,),retry_kwargs={'max_retries': 3, 'countdown': 5},retry_backoff=True
)
def fetch_data_from_api(url):# 向某接口请求数据...
autoretry_for
: 指定遇到哪些异常时自动重试
retry_kwargs
: 设置最大重试次数、重试等待时间
retry_backoff
: 每次重试,等待时间指数级增长(比如 5 秒, 10 秒, 20 秒…)
自定义任务名称
默认任务名是 模块名.函数名
,比如 tasks.add
,你可以自己给任务起一个短一点的名字
@app.task(name="我的自定义任务")
def add(x, y):return x + y
实现任务优先级
注意,我们这里使用的 broker 是 Redis,不支持原生的优先级任务队列,不过我们可以使用参数设置
- 要靠建多个队列(比如:high、medium、low)
- 不同优先级的任务投到不同队列
- Worker 根据策略消费
@app.task(queue='high_priority')
def send_alert(msg):...
然后 Worker 专门监听 high_priority:
celery -A proj worker -Q high_priority,default
这并不是真正的优先级队列,但是面对九成的情况下应该够用了。
高级任务管理
之前的示例中,我们只介绍了 delay
方法调用 celery 任务,现在我们要介绍一个更高级的方法apply_async
方法。简单来说,
delay
是 apply_async
的简化版
任务延迟执行
from tasks import addadd.apply_async((4, 4), countdown=30) # 30秒后执行
指定时间执行
from datetime import datetime, timedeltafuture_time = datetime.utcnow() + timedelta(minutes=10)
add.apply_async((4, 4), eta=future_time) # 10分钟后执行
任务超时自动取消
add.apply_async((4, 4), countdown=60, expires=90)
# 任务60秒后执行,如果90秒后还没执行,就不执行了
任务优先级
如果 Broker 支持,比如 RabbitMQ,任务可以有优先级。Redis 默认不支持(需要多队列模拟)
add.apply_async((4, 4), priority=9) # 数字越大,优先级越高
重试任务
add.apply_async((2, 2),retry=True,retry_policy={'max_retries': 5, # 最多重试5次'interval_start': 0, # 初始重试等待0秒'interval_step': 0.2, # 每次增加0.2秒'interval_max': 5, # 最大重试间隔5秒}
)
任务链与工作流
简介
多个任务串联/组合起来,按照一定的执行顺序、规则运行。就像做菜,先切菜 → 再炒菜 → 最后装盘,一步步来!在 Celery 里,这种组合多个任务执行的方式,就叫做:
- 链(chain):一个任务接一个任务执行(顺序)
- 组(group):一批任务同时并行执行(并发)
- 链组混合(chord):先一批并行跑完,再继续下一步
- 嵌套复杂工作流:chain、group、chord 混合使用
任务链 chain(顺序执行)
多个任务,一个接一个执行,每个任务的输出作为下一个任务的输入
from celery import chain@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def subtract(x, y):return x - y# 执行:先加,再乘,再减
workflow = chain(add.s(2, 3), # 2 + 3 = 5multiply.s(10), # 5 * 10 = 50subtract.s(7) # 50 - 7 = 43
)workflow.apply_async()
并发任务
group
允许你将多个任务包装在一起并并行执行。每个任务会异步地执行,并且 group
会在所有任务完成后返回一个结果列表
@app.task
def add(x, y):return x + y@app.task
def multiply(x, y):return x * y@app.task
def subtract(x, y):return x - y@app.task
def execute_group():# 使用 group 执行多个任务result = group(add.s(2, 3), multiply.s(4, 5), subtract.s(10, 5))()return result # 获取所有任务的结果
并发完成后继续执行新任务
先并行执行一批任务,全部执行完再统一执行下一步任务。这个方法需要设置结果后端
from celery import Celery, chain, groupapp = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
import logging
import timefrom celery import chord@app.task
def fetch_data(x):return x * 2@app.task
def aggregate(results):print(f"所有结果:{results}")# 先并行 fetch_data,然后聚合
workflow = chord([fetch_data.s(i) for i in range(5)], # 并行执行 5个任务aggregate.s() # 全部完成后聚合处理
)workflow.apply_async()
定时任务
Celery 是一个强大的分布式任务队列,它不仅支持异步任务执行,还能够管理定时任务。Celery 定时任务通常使用 Celery Beat 来实现,Celery Beat 是 Celery 的一个附加组件,用于定时调度任务
快速开始
(依赖多到吓人 👻 感觉还是不用为好)
uv add beat # 安装 beat
- 简单示例
@app.task
def add(x, y):return x + y# 配置定时任务
app.conf.beat_schedule = {'add-every-10-seconds': {'task': 'tasks.add','schedule': 1, # 每1秒钟执行一次'args': (16, 16), # 任务参数},
}app.conf.timezone = 'Asia/Shanghai'
除此这外,你还需要额外启动 beat 进程
uv run celery -A tasks beat --loglevel=info
使用 crontab 配置
from celery.schedules import crontabapp.conf.beat_schedule = {'add-every-day-at-noon': {'task': 'tasks.add','schedule': crontab(minute=0, hour=12), # 每天中午12点执行'args': (16, 16),},
}
我为啥不直接使用 crontab 🤪
定时任务重试机制
@app.task(bind=True, max_retries=3, default_retry_delay=5)
def add(self, x, y):try:print(f"Adding {x} + {y}")return x + yexcept Exception as exc:raise self.retry(exc=exc)
Worker 高级配置
并发数设置
决定 Worker 可以同时处理多少个任务。默认值是机器 CPU 核心数。
可以手动调整:
celery -A proj worker --concurrency=16
经验建议:
- CPU 密集型任务(如图像处理):
CPU核心数 × 2
- IO 密集型任务(如 API 请求):可以调到几百、上千,结合 gevent/eventlet
任务预取
Worker 一次最多提前拉多少个任务到本地
worker_prefetch_multiplier = 1
任务执行时间长/不稳定时,prefetch_multiplier=1 是最佳实践,避免个别 worker 抢太多任务导致阻塞
子进程自动回收
每个子进程执行完一定数量任务后重启,防止内存泄漏
worker_max_tasks_per_child = 100
- 生产环境一定要设置!
- 如果你的任务本身会引起内存飙升(比如处理大图片、PDF 生成),可以把值设小一点,比如 50
任务运行时间限制
防止任务卡死或死循环
task_time_limit = 600 # 硬超时,600秒后直接杀死
task_soft_time_limit = 550 # 软超时,550秒抛异常,你可以优雅退出
区别:
- 软超时:任务可以捕获异常,优雅退出
- 硬超时:超时直接杀掉,不给你收拾残局的机会
经验:
- 一般都设置,保证 Worker 长期稳定运行
- 软超时稍短于硬超时,比如差 10% 左右
链接
- 任务队列神器:Celery 入门到进阶指南
- ChatGPT
- DeepSeek