[异步任务处理]Celery + RabbitMQ

Celery 是一个使用 Python 开发的分布式任务队列框架,广泛用于处理异步任务定时任务。它通过消息中间件(Broker)将耗时或复杂的任务分发给一个或多个工作节点(Worker)进行异步执行,适用于高并发、任务解耦、批量数据处理、后台服务等场景。

Celery解析

Celery/RabbitMQ安装

1
2
3
4
5
# Install RabbitMQ
$ sudo apt-get install rabbitmq-server

# Install Celery
$ pip3 install celery -i https://pypi.tuna.tsinghua.edu.cn/simple

工作流程

  1. 任务提交:应用调用 task.delay(args) 方法,将任务发送到 Broker;
  2. 任务入队:Broker 接收任务并放入队列中等待处理;
  3. 任务消费:Worker 监听到队列中有新任务,取出并执行;
  4. 结果返回(可选):若配置了 Result Backend,则任务执行结果会被写入指定存储;
  5. 结果查询(可选):生产者可通过任务 ID 查询执行状态和结果。

注意:在 Celery 中,一个 Worker 实例通常会管理多个进程或线程,这意味着它可以同时执行多个任务。

关键组件

Celery 的核心架构由以下几个关键组件组成:

  1. Producer(生产者)

也称为客户端(Client),是任务的发起方。通常是一个 Web 应用(如 Django 或 Flask),当有需要异步处理的操作时,它会向 Broker 发送一个任务消息。

  1. Broker(消息中间件)

负责接收来自生产者的任务消息,并将其排队等待 Worker 处理。常用的 Broker 包括:

  • RabbitMQ:功能强大、稳定性高,适合生产环境。
  • Redis:性能优异,支持发布/订阅机制,也常用于缓存。
  • 其他支持的包括 Amazon SQS、ZooKeeper、MongoDB 等。
  1. Worker(消费者)

Worker 是运行在独立进程或机器上的任务执行单元。它们持续监听 Broker 中的任务队列,一旦发现新任务,就会取出并执行。一个系统中可以部署多个 Worker,实现横向扩展,提升任务处理能力。

  1. Task(任务)

任务是 Celery 执行的基本单位,通常是一个带有 @app.task 装饰器的 Python 函数。它可以接受参数,返回结果,并支持重试、延迟、优先级等功能。

  1. Result Backend(结果后端,可选)

如果需要获取任务执行的结果,可以通过配置 Result Backend 来存储任务状态和返回值。常用方案包括 Redis、RabbitMQ(RPC 模式)、数据库等。

QuickStart

完整的 Celery 异步任务处理流程,包括:

  • 定义 Celery 实例与异步任务
  • 启动 Celery Worker 监听队列
  • 调用异步任务并获取结果

定义 Celery 实例与异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# celery_app.py
from celery import Celery

# 初始化 Celery 应用实例
# broker: 使用 RabbitMQ 作为消息代理
# result_backend: 可选,用于存储任务执行结果(这里使用 RPC 返回)
app = Celery(
'tasks',
broker='amqp://guest@localhost//', # RabbitMQ 默认连接地址
backend='rpc://' # 使用 RabbitMQ 的 RPC 方式返回任务结果
)

# 示例任务 1:无参数打印 Hello
@app.task
def print_hello():
print("hello Celery")

# 示例任务 2:两个数字相乘
@app.task
def add_numbers(num1, num2):
"""
接收两个数字,返回它们的乘积。
"""
return num1 * num2

# 示例任务 3:接收多个数字列表,返回总和
@app.task
def add_numbers2(**kwargs):
"""
接收关键字参数中的 numbers 列表,返回其总和。

示例调用:
add_numbers2.delay(numbers=[10, 20, 30])
"""
assert 'numbers' in kwargs.keys(), "必须传入 key 为 'numbers' 的参数"
return sum(kwargs['numbers'])

注意:@app.task必须放在函数定义的最上层装饰器位置

启动 Celery Worker 监听队列

1
$ celery -A celery_app worker --loglevel=info
  • -A celery_app:指定 Celery 实例所在的模块名(即 celery_app.py
  • worker:启动一个 Worker 进程来消费任务
  • --loglevel=info:设置日志级别为 info

调用异步任务并获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# run_task.py
from celery_worker import print_hello, add_numbers, add_numbers2

# 测试 1: print_hello()
print("=== 测试 print_hello ===")
result_hello = print_hello.delay()
print("任务ID:", result_hello.id)
print("任务结果:", result_hello.get(timeout=10)) # 如果没有返回值,结果为 None

# 测试 2: add_numbers(num1, num2)
print("\n=== 测试 add_numbers ===")
numbers_add = [10, 20]
result_add = add_numbers.delay(*numbers_add) # 解包传参
print("任务ID:", result_add.id)
print("任务结果:", result_add.get(timeout=10))

# 测试 3: add_numbers2(**kwargs)
print("\n=== 测试 add_numbers2 ===")
result_add2 = add_numbers2.delay(numbers=[5, 15, 30]) # 以字典形式传递参数
print("任务ID:", result_add2.id)
print("任务结果:", result_add2.get(timeout=10))

执行测试

启动Celery,然后运行测试程序,执行日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
$ celery -A celery_app worker --loglevel=info

-------------- celery@LAPTOP-S3BIPLGN v5.5.3 (immunity)
--- ***** -----
-- ******* ---- Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35 2025-06-28 21:38:42
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x76c43b4f90d0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 22 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. celery_app.add_numbers
. celery_app.add_numbers2
. celery_app.print_hello

[2025-06-28 21:38:42,429: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2025-06-28 21:38:42,432: INFO/MainProcess] mingle: searching for neighbors
[2025-06-28 21:38:43,455: INFO/MainProcess] mingle: all alone
[2025-06-28 21:38:43,468: INFO/MainProcess] celery@LAPTOP-S3BIPLGN ready.
[2025-06-28 21:39:14,508: INFO/MainProcess] Task celery_app.print_hello[cfa60a5a-ebb1-40d1-93e1-288f9000c2db] received
[2025-06-28 21:39:14,509: WARNING/ForkPoolWorker-16] hello Celery
[2025-06-28 21:39:14,516: INFO/ForkPoolWorker-16] Task celery_app.print_hello[cfa60a5a-ebb1-40d1-93e1-288f9000c2db] succeeded in 0.006867269999929704s: None
[2025-06-28 21:39:14,518: INFO/MainProcess] Task celery_app.add_numbers[da6053cb-90bf-4343-953e-43c29362795a] received
[2025-06-28 21:39:14,519: INFO/ForkPoolWorker-16] Task celery_app.add_numbers[da6053cb-90bf-4343-953e-43c29362795a] succeeded in 0.0003271770001447294s: 200
[2025-06-28 21:39:14,521: INFO/MainProcess] Task celery_app.add_numbers2[7e828bbb-3026-415c-bf1c-c15570be5176] received
[2025-06-28 21:39:14,522: INFO/ForkPoolWorker-16] Task celery_app.add_numbers2[7e828bbb-3026-415c-bf1c-c15570be5176] succeeded in 0.00023004600006970577s: 50

$ python3 run_task.py
=== 测试 print_hello ===
任务ID: cfa60a5a-ebb1-40d1-93e1-288f9000c2db
任务结果: None

=== 测试 add_numbers ===
任务ID: da6053cb-90bf-4343-953e-43c29362795a
任务结果: 200

=== 测试 add_numbers2 ===
任务ID: 7e828bbb-3026-415c-bf1c-c15570be5176
任务结果: 50

高级设置

delay vs. apply_async

1
2
delay(*args, **kwargs)
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)[source]

delayapply_async的简易版本,比较apply_async会少一些输入选项。它们的目的都是为了发送消息给异步应用任务

1
2
3
4
5
6
7
8
9
10
11
12
def delay(self, *args, **kwargs):
"""Star argument version of :meth:`apply_async`.

Does not support the extra options enabled by :meth:`apply_async`.

Arguments:
*args (Any): Positional arguments passed on to the task.
**kwargs (Any): Keyword arguments passed on to the task.
Returns:
celery.result.AsyncResult: Future promise.
"""
return self.apply_async(args, kwargs)

配置文件

Celery 支持通过加载配置文件的方式来定义各种配置选项。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# celery_config.py

# 指定消息代理(Broker)的地址,这里使用的是 RabbitMQ
# 格式为:amqp://<用户名>:<密码>@<主机>/<虚拟主机>
broker_url = 'amqp://guest@localhost//'

# 指定结果后端(Backend)的地址,这里也使用 RabbitMQ 的 RPC 协议
result_backend = 'rpc://'

# 配置任务的序列化方式,推荐使用 JSON 格式
task_serializer = 'json'

# 指定接受的内容类型,确保只处理 JSON 格式的任务
accept_content = ['json']

# 配置任务执行结果的序列化方式,保持与任务一致
result_serializer = 'json'

# 时区设置,默认为 UTC,可以根据项目需求调整
# 注意:如果启用时区支持,需同时设置 enable_utc 为 True
timezone = 'Asia/Shanghai' # 示例:设置为中国上海时区
enable_utc = False # 如果不希望强制使用 UTC 时间,可以设置为 False

在 Celery 实例初始化时,可以通过 config_from_object 方法加载上述配置文件:

1
2
3
4
5
# celery_app.py
from celery import Celery

app = Celery("tasks")
app.config_from_object('celery_config')

多文件定义

Celery允许在不同文件中定义各自的异步任务。示例如下:

主文件:celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# celery_app.py
from celery import Celery

def make_celery_instance():
"""
创建并配置 Celery 实例
"""
celery_instance = Celery("tasks")
celery_instance.conf.update(
include=[
'celery_print_hello',
'celery_add_numbers',
'celery_add_numbers2'
] # 确保路径正确指向你的任务文件
)
celery_instance.config_from_object("celery_config")
return celery_instance


# Celery 实例
celery_instance = make_celery_instance()

任务文件 1:celery_print_hello.py

1
2
3
4
5
6
7
8
9
# celery_print_hello.py
from celery_app import celery_instance

@app.task(name='print_hello')
def print_hello():
"""
示例任务 1:无参数打印 Hello
"""
print("hello Celery")

任务文件 2:celery_add_numbers.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# celery_add_numbers.py
from celery_app import celery_instance

@app.task(name='add_numbers')
def add_numbers(num1, num2):
"""
示例任务 2:两个数字相乘

参数:
num1 (int): 第一个数字
num2 (int): 第二个数字

返回:
int: 两数相乘的结果
"""
return num1 * num2

任务文件 3:celery_add_numbers2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# celery_add_numbers2.py
from celery_app import celery_instance

@app.task(name='add_numbers2')
def add_numbers2(**kwargs):
"""
示例任务 3:接收多个数字列表,返回总和

参数:
kwargs (dict): 包含关键字参数的字典,必须包含 key 为 'numbers' 的列表

返回:
int: 数字列表的总和

示例调用:
add_numbers2.delay(numbers=[10, 20, 30])
"""
assert 'numbers' in kwargs.keys(), "必须传入 key 为 'numbers' 的参数"
return sum(kwargs['numbers'])

💡 注意事项:

  • 在每个任务文件中导入 celery_instance,并在任务函数上应用 @app.task 装饰器。
  • 确保所有任务文件的路径正确,且被包含在 celery_app.py 中的 include 列表中。
  • 使用 name 参数为任务指定名称,以便更好地管理和监控任务。

Celery Hook

Celery 提供了丰富的钩子(Hook)机制,允许开发者在任务执行生命周期中的关键节点插入自定义逻辑。常用钩子有:

  1. worker_process_init

触发时机:当每个 Worker 的子进程被创建并启动时调用
用途:适合用于数据库连接池初始化、缓存客户端连接等

1
2
3
4
5
from celery import signals

@signals.worker_process_init.connect
def on_worker_init(**kwargs):
print("Worker 子进程已启动,可以在此初始化资源,如数据库连接")
  1. worker_shutting_down

触发时机:Worker 即将关闭时调用
用途:用于释放资源、保存状态、关闭连接等

1
2
3
@signals.worker_shutting_down.connect
def on_worker_shutdown(**kwargs):
print("Worker 正在关闭,可以在此释放资源")
  1. task_prerun

触发时机:任务开始执行之前
用途:适合用于记录任务启动时间、验证输入参数、初始化上下文等

1
2
3
4
5
from celery import signals

@signals.task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):
print(f"【Pre-run】Task {task.name} ({task_id}) 正在启动,参数: {args}, {kwargs}")
  1. task_postrun

触发时机:任务执行完成后
用途:适合用于清理资源、记录任务结束时间、处理结果等

1
2
3
@signals.task_postrun.connect
def task_postrun_handler(task_id, task, retval, state, args, kwargs, **kw):
print(f"【Post-run】Task {task.name} ({task_id}) 已完成,状态: {state}, 返回值: {retval}")

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# celery_app.py
from celery import Celery, signals

app = Celery('tasks', broker='amqp://guest@localhost//')


# 钩子函数:Worker 子进程初始化
@signals.worker_process_init.connect
def on_worker_init(**kwargs):
print("【Hook】Worker 子进程已启动,正在初始化资源...")


# 钩子函数:Worker 正在关闭
@signals.worker_shutting_down.connect
def on_worker_shutdown(**kwargs):
print("【Hook】Worker 正在关闭,正在释放资源...")


# 钩子函数:任务开始执行前
@signals.task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):
print(f"【Pre-run】Task {task.name} ({task_id}) 正在启动,参数: {args}, {kwargs}")


# 钩子函数:任务执行完成后
@signals.task_postrun.connect
def task_postrun_handler(task_id, task, retval, state, args, kwargs, **kw):
print(f"【Post-run】Task {task.name} ({task_id}) 已完成,状态: {state}, 返回值: {retval}")


# 示例任务
@app.task
def add(x, y):
return x + y


@app.task
def multiply(x, y):
return x * y

启动 Worker:

1
celery -A celery_app worker --loglevel=info

测试代码如下:

1
2
3
4
5
6
7
8
# run_task.py
from celery_app import add, multiply

result_add = add.delay(4, 6)
print(f"Add Task ID: {result_add.id}")

result_multiply = multiply.delay(5, 7)
print(f"Multiply Task ID: {result_multiply.id}")

输出日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
$ celery -A celery_app worker --loglevel=info

-------------- celery@LAPTOP-S3BIPLGN v5.5.3 (immunity)
--- ***** -----
-- ******* ---- Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35 2025-06-28 18:19:21
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7cbcadfcff70
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 22 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. celery_app.add
. celery_app.multiply

[2025-06-28 18:19:21,656: WARNING/ForkPoolWorker-1] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,673: WARNING/ForkPoolWorker-2] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,688: WARNING/ForkPoolWorker-3] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,703: WARNING/ForkPoolWorker-4] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,719: WARNING/ForkPoolWorker-5] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,735: WARNING/ForkPoolWorker-6] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,754: WARNING/ForkPoolWorker-7] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,770: WARNING/ForkPoolWorker-8] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,785: WARNING/ForkPoolWorker-9] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,799: WARNING/ForkPoolWorker-10] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,814: WARNING/ForkPoolWorker-11] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,832: WARNING/ForkPoolWorker-12] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,846: WARNING/ForkPoolWorker-13] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,864: WARNING/ForkPoolWorker-14] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,881: WARNING/ForkPoolWorker-15] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,899: WARNING/ForkPoolWorker-16] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,918: WARNING/ForkPoolWorker-17] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,935: WARNING/ForkPoolWorker-18] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,952: WARNING/ForkPoolWorker-19] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,974: WARNING/ForkPoolWorker-20] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:21,994: WARNING/ForkPoolWorker-21] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:22,010: WARNING/ForkPoolWorker-22] 【Hook】Worker 子进程已启动,正在初始化资源...
[2025-06-28 18:19:22,020: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2025-06-28 18:19:22,023: INFO/MainProcess] mingle: searching for neighbors
[2025-06-28 18:19:23,052: INFO/MainProcess] mingle: all alone
[2025-06-28 18:19:23,062: INFO/MainProcess] celery@LAPTOP-S3BIPLGN ready.
[2025-06-28 18:19:48,970: INFO/MainProcess] Task celery_app.add[ac943749-c241-48b3-8a50-8612094223e5] received
[2025-06-28 18:19:48,970: INFO/MainProcess] Task celery_app.multiply[ede8b30f-c1c2-4d86-b50d-11099fd2d56d] received
[2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-16] 【Pre-run】Task celery_app.add (ac943749-c241-48b3-8a50-8612094223e5) 正在启动,参数: [4, 6], {}
[2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-1] 【Pre-run】Task celery_app.multiply (ede8b30f-c1c2-4d86-b50d-11099fd2d56d) 正在启动,参数: [5, 7], {}
[2025-06-28 18:19:48,971: INFO/ForkPoolWorker-1] Task celery_app.multiply[ede8b30f-c1c2-4d86-b50d-11099fd2d56d] succeeded in 0.0004443160014488967s: 35
[2025-06-28 18:19:48,971: INFO/ForkPoolWorker-16] Task celery_app.add[ac943749-c241-48b3-8a50-8612094223e5] succeeded in 0.00045413700172503013s: 10
[2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-16] 【Post-run】Task celery_app.add (ac943749-c241-48b3-8a50-8612094223e5) 已完成,状态: SUCCESS, 返回值: 10
[2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-1] 【Post-run】Task celery_app.multiply (ede8b30f-c1c2-4d86-b50d-11099fd2d56d) 已完成,状态: SUCCESS, 返回值: 35

$ python3 run_task.py
Add Task ID: ac943749-c241-48b3-8a50-8612094223e5
Multiply Task ID: ede8b30f-c1c2-4d86-b50d-11099fd2d56d

任务监控

在 Celery 中,当我们通过 delay()apply_async() 向异步任务发送请求后,会返回一个 AsyncResult 实例。这个对象包含了任务的唯一标识(UUID)以及用于查询任务状态的结果后端(backend),可以实现任务监控、状态追踪和结果获取。

1
2
3
4
5
6
7
8
from celery_app import add_numbers

# 使用 apply_async() 发送任务
task_data = {'num1': 10, 'num2': 20}
celery_task = add_numbers.apply_async(kwargs=task_data)

# 打印任务ID
print("任务ID:", celery_task.id)

常用的 AsyncResult 方法有

  • ready():如果任务已执行,则返回True;如果任务仍在运行、挂起或等待重试,则返回False。
  • successful():返回True表示任务成功执行。
  • failed():返回True表示任务执行失败
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from celery_app import add_numbers

# 发起异步任务
task_data = {'num1': 10, 'num2': 20}
celery_task = add_numbers.apply_async(kwargs=task_data)

print(f"任务已提交,任务ID: {celery_task.id}")

# 轮询任务状态
while not celery_task.ready():
print("任务仍在运行或等待中...")
time.sleep(1) # 每隔1秒检查一次

# 任务完成后输出最终状态
if celery_task.successful():
print("✅ 任务成功完成!")
print("任务结果:", celery_task.result)
elif celery_task.failed():
print("❌ 任务执行失败!")
print("错误信息:", celery_task.traceback)

注意:要使用 ready()、successful()、get() 等功能,必须在 Celery 配置中启用 result_backend。比如

1
app.conf.result_backend = 'rpc://'  # 或 redis://localhost/0

相关阅读