Celery 是一个使用 Python 开发的分布式任务队列框架,广泛用于处理异步任务 和定时任务 。它通过消息中间件(Broker)将耗时或复杂的任务分发给一个或多个工作节点(Worker)进行异步执行,适用于高并发、任务解耦、批量数据处理、后台服务等场景。
Celery解析
Celery/RabbitMQ安装
1 2 3 4 5 # Install RabbitMQ $ sudo apt-get install rabbitmq-server# Install Celery $ pip3 install celery -i https://pypi.tuna.tsinghua.edu.cn/simple
工作流程
任务提交 :应用调用 task.delay(args)
方法,将任务发送到 Broker;
任务入队 :Broker 接收任务并放入队列中等待处理;
任务消费 :Worker 监听到队列中有新任务,取出并执行;
结果返回(可选) :若配置了 Result Backend,则任务执行结果会被写入指定存储;
结果查询(可选) :生产者可通过任务 ID 查询执行状态和结果。
注意:在 Celery 中,一个 Worker 实例通常会管理多个进程或线程,这意味着它可以同时执行多个任务。
关键组件
Celery 的核心架构由以下几个关键组件组成:
Producer(生产者)
也称为客户端(Client),是任务的发起方。通常是一个 Web 应用(如 Django 或 Flask),当有需要异步处理的操作时,它会向 Broker 发送一个任务消息。
Broker(消息中间件)
负责接收来自生产者的任务消息,并将其排队等待 Worker 处理。常用的 Broker 包括:
RabbitMQ :功能强大、稳定性高,适合生产环境。
Redis :性能优异,支持发布/订阅机制,也常用于缓存。
其他支持的包括 Amazon SQS、ZooKeeper、MongoDB 等。
Worker(消费者)
Worker 是运行在独立进程或机器上的任务执行单元。它们持续监听 Broker 中的任务队列,一旦发现新任务,就会取出并执行。一个系统中可以部署多个 Worker,实现横向扩展,提升任务处理能力。
Task(任务)
任务是 Celery 执行的基本单位,通常是一个带有 @app.task
装饰器的 Python 函数。它可以接受参数,返回结果,并支持重试、延迟、优先级等功能。
Result Backend(结果后端,可选)
如果需要获取任务执行的结果,可以通过配置 Result Backend 来存储任务状态和返回值。常用方案包括 Redis、RabbitMQ(RPC 模式)、数据库等。
QuickStart
完整的 Celery 异步任务处理流程,包括:
定义 Celery 实例与异步任务
启动 Celery Worker 监听队列
调用异步任务并获取结果
定义 Celery 实例与异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 # celery_app.py from celery import Celery # 初始化 Celery 应用实例 # broker: 使用 RabbitMQ 作为消息代理 # result_backend: 可选,用于存储任务执行结果(这里使用 RPC 返回) app = Celery( 'tasks', broker='amqp://guest@localhost//', # RabbitMQ 默认连接地址 backend='rpc://' # 使用 RabbitMQ 的 RPC 方式返回任务结果 ) # 示例任务 1:无参数打印 Hello @app.task def print_hello(): print("hello Celery") # 示例任务 2:两个数字相乘 @app.task def add_numbers(num1, num2): """ 接收两个数字,返回它们的乘积。 """ return num1 * num2 # 示例任务 3:接收多个数字列表,返回总和 @app.task def add_numbers2(**kwargs): """ 接收关键字参数中的 numbers 列表,返回其总和。 示例调用: add_numbers2.delay(numbers=[10, 20, 30]) """ assert 'numbers' in kwargs.keys(), "必须传入 key 为 'numbers' 的参数" return sum(kwargs['numbers'])
注意:@app.task
必须放在函数定义的最上层装饰器位置
启动 Celery Worker 监听队列
1 $ celery -A celery_app worker --loglevel=info
-A celery_app
:指定 Celery 实例所在的模块名(即 celery_app.py
)
worker
:启动一个 Worker 进程来消费任务
--loglevel=info
:设置日志级别为 info
调用异步任务并获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # run_task.py from celery_worker import print_hello, add_numbers, add_numbers2 # 测试 1: print_hello() print("=== 测试 print_hello ===") result_hello = print_hello.delay() print("任务ID:", result_hello.id) print("任务结果:", result_hello.get(timeout=10)) # 如果没有返回值,结果为 None # 测试 2: add_numbers(num1, num2) print("\n=== 测试 add_numbers ===") numbers_add = [10, 20] result_add = add_numbers.delay(*numbers_add) # 解包传参 print("任务ID:", result_add.id) print("任务结果:", result_add.get(timeout=10)) # 测试 3: add_numbers2(**kwargs) print("\n=== 测试 add_numbers2 ===") result_add2 = add_numbers2.delay(numbers=[5, 15, 30]) # 以字典形式传递参数 print("任务ID:", result_add2.id) print("任务结果:", result_add2.get(timeout=10))
执行测试
启动Celery,然后运行测试程序,执行日志如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 $ celery -A celery_app worker --loglevel=info -------------- celery@LAPTOP-S3BIPLGN v5.5.3 (immunity) --- ***** ----- -- ******* ---- Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35 2025-06-28 21:38:42 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x76c43b4f90d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 22 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery_app.add_numbers . celery_app.add_numbers2 . celery_app.print_hello [2025-06-28 21:38:42,429: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2025-06-28 21:38:42,432: INFO/MainProcess] mingle: searching for neighbors [2025-06-28 21:38:43,455: INFO/MainProcess] mingle: all alone [2025-06-28 21:38:43,468: INFO/MainProcess] celery@LAPTOP-S3BIPLGN ready. [2025-06-28 21:39:14,508: INFO/MainProcess] Task celery_app.print_hello[cfa60a5a-ebb1-40d1-93e1-288f9000c2db] received [2025-06-28 21:39:14,509: WARNING/ForkPoolWorker-16] hello Celery [2025-06-28 21:39:14,516: INFO/ForkPoolWorker-16] Task celery_app.print_hello[cfa60a5a-ebb1-40d1-93e1-288f9000c2db] succeeded in 0.006867269999929704s: None [2025-06-28 21:39:14,518: INFO/MainProcess] Task celery_app.add_numbers[da6053cb-90bf-4343-953e-43c29362795a] received [2025-06-28 21:39:14,519: INFO/ForkPoolWorker-16] Task celery_app.add_numbers[da6053cb-90bf-4343-953e-43c29362795a] succeeded in 0.0003271770001447294s: 200 [2025-06-28 21:39:14,521: INFO/MainProcess] Task celery_app.add_numbers2[7e828bbb-3026-415c-bf1c-c15570be5176] received [2025-06-28 21:39:14,522: INFO/ForkPoolWorker-16] Task celery_app.add_numbers2[7e828bbb-3026-415c-bf1c-c15570be5176] succeeded in 0.00023004600006970577s: 50 $ python3 run_task.py === 测试 print_hello === 任务ID: cfa60a5a-ebb1-40d1-93e1-288f9000c2db 任务结果: None === 测试 add_numbers === 任务ID: da6053cb-90bf-4343-953e-43c29362795a 任务结果: 200 === 测试 add_numbers2 === 任务ID: 7e828bbb-3026-415c-bf1c-c15570be5176 任务结果: 50
高级设置
delay vs. apply_async
1 2 delay(*a rgs, **kwargs) apply_async(args =None, kwargs =None, task_id =None, producer =None, link =None, link_error =None, shadow =None, **options)[source]
delay
是apply_async
的简易版本,比较apply_async
会少一些输入选项。它们的目的都是为了发送消息给异步应用任务
1 2 3 4 5 6 7 8 9 10 11 12 def delay (self, *args, **kwargs ): """Star argument version of :meth:`apply_async`. Does not support the extra options enabled by :meth:`apply_async`. Arguments: *args (Any): Positional arguments passed on to the task. **kwargs (Any): Keyword arguments passed on to the task. Returns: celery.result.AsyncResult: Future promise. """ return self .apply_async(args, kwargs)
配置文件
Celery 支持通过加载配置文件的方式来定义各种配置选项。示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 broker_url = 'amqp://guest@localhost//' result_backend = 'rpc://' task_serializer = 'json' accept_content = ['json' ] result_serializer = 'json' timezone = 'Asia/Shanghai' enable_utc = False
在 Celery 实例初始化时,可以通过 config_from_object
方法加载上述配置文件:
1 2 3 4 5 # celery_app.py from celery import Celeryapp = Celery("tasks" ) app.config_from_object('celery_config' )
多文件定义
Celery允许在不同文件中定义各自的异步任务。示例如下:
主文件:celery_app.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from celery import Celerydef make_celery_instance (): """ 创建并配置 Celery 实例 """ celery_instance = Celery("tasks" ) celery_instance.conf.update( include=[ 'celery_print_hello' , 'celery_add_numbers' , 'celery_add_numbers2' ] ) celery_instance.config_from_object("celery_config" ) return celery_instance celery_instance = make_celery_instance()
任务文件 1:celery_print_hello.py
1 2 3 4 5 6 7 8 9 from celery_app import celery_instance@app.task(name='print_hello' ) def print_hello (): """ 示例任务 1:无参数打印 Hello """ print ("hello Celery" )
任务文件 2:celery_add_numbers.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from celery_app import celery_instance@app.task(name='add_numbers' ) def add_numbers (num1, num2 ): """ 示例任务 2:两个数字相乘 参数: num1 (int): 第一个数字 num2 (int): 第二个数字 返回: int: 两数相乘的结果 """ return num1 * num2
任务文件 3:celery_add_numbers2.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from celery_app import celery_instance@app.task(name='add_numbers2' ) def add_numbers2 (**kwargs ): """ 示例任务 3:接收多个数字列表,返回总和 参数: kwargs (dict): 包含关键字参数的字典,必须包含 key 为 'numbers' 的列表 返回: int: 数字列表的总和 示例调用: add_numbers2.delay(numbers=[10, 20, 30]) """ assert 'numbers' in kwargs.keys(), "必须传入 key 为 'numbers' 的参数" return sum (kwargs['numbers' ])
💡 注意事项:
在每个任务文件中导入 celery_instance
,并在任务函数上应用 @app.task
装饰器。
确保所有任务文件的路径正确,且被包含在 celery_app.py
中的 include
列表中。
使用 name
参数为任务指定名称,以便更好地管理和监控任务。
Celery Hook
Celery 提供了丰富的钩子(Hook)机制,允许开发者在任务执行生命周期中的关键节点插入自定义逻辑。常用钩子有:
worker_process_init
触发时机:当每个 Worker 的子进程被创建并启动时调用
用途:适合用于数据库连接池初始化、缓存客户端连接等
1 2 3 4 5 from celery import signals@signals.worker_process_init.connect def on_worker_init (**kwargs ): print ("Worker 子进程已启动,可以在此初始化资源,如数据库连接" )
worker_shutting_down
触发时机:Worker 即将关闭时调用
用途:用于释放资源、保存状态、关闭连接等
1 2 3 @signals.worker_shutting_down.connect def on_worker_shutdown(**kwargs): print("Worker 正在关闭,可以在此释放资源")
task_prerun
触发时机:任务开始执行之前
用途:适合用于记录任务启动时间、验证输入参数、初始化上下文等
1 2 3 4 5 from celery import signals@signals.task_prerun.connect def task_prerun_handler (task_id, task, args, kwargs, **kw ): print (f"【Pre-run】Task {task.name} ({task_id} ) 正在启动,参数: {args} , {kwargs} " )
task_postrun
触发时机:任务执行完成后
用途:适合用于清理资源、记录任务结束时间、处理结果等
1 2 3 @signals.task_postrun.connect def task_postrun_handler(task_id, task, retval, state, args, kwargs, **kw): print(f"【Post-run】Task {task.name} ({task_id}) 已完成,状态: {state}, 返回值: {retval}")
测试代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from celery import Celery, signalsapp = Celery('tasks' , broker='amqp://guest@localhost//' ) @signals.worker_process_init.connect def on_worker_init (**kwargs ): print ("【Hook】Worker 子进程已启动,正在初始化资源..." ) @signals.worker_shutting_down.connect def on_worker_shutdown (**kwargs ): print ("【Hook】Worker 正在关闭,正在释放资源..." ) @signals.task_prerun.connect def task_prerun_handler (task_id, task, args, kwargs, **kw ): print (f"【Pre-run】Task {task.name} ({task_id} ) 正在启动,参数: {args} , {kwargs} " ) @signals.task_postrun.connect def task_postrun_handler (task_id, task, retval, state, args, kwargs, **kw ): print (f"【Post-run】Task {task.name} ({task_id} ) 已完成,状态: {state} , 返回值: {retval} " ) @app.task def add (x, y ): return x + y @app.task def multiply (x, y ): return x * y
启动 Worker:
1 celery -A celery_app worker --loglevel=info
测试代码如下:
1 2 3 4 5 6 7 8 from celery_app import add, multiplyresult_add = add.delay(4 , 6 ) print (f"Add Task ID: {result_add.id } " )result_multiply = multiply.delay(5 , 7 ) print (f"Multiply Task ID: {result_multiply.id } " )
输出日志如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 $ celery -A celery_app worker --loglevel=info -------------- celery@LAPTOP-S3BIPLGN v5.5.3 (immunity) --- ***** ----- -- ******* ---- Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35 2025-06-28 18:19:21 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7cbcadfcff70 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 22 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery_app.add . celery_app.multiply [2025-06-28 18:19:21,656: WARNING/ForkPoolWorker-1] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,673: WARNING/ForkPoolWorker-2] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,688: WARNING/ForkPoolWorker-3] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,703: WARNING/ForkPoolWorker-4] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,719: WARNING/ForkPoolWorker-5] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,735: WARNING/ForkPoolWorker-6] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,754: WARNING/ForkPoolWorker-7] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,770: WARNING/ForkPoolWorker-8] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,785: WARNING/ForkPoolWorker-9] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,799: WARNING/ForkPoolWorker-10] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,814: WARNING/ForkPoolWorker-11] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,832: WARNING/ForkPoolWorker-12] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,846: WARNING/ForkPoolWorker-13] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,864: WARNING/ForkPoolWorker-14] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,881: WARNING/ForkPoolWorker-15] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,899: WARNING/ForkPoolWorker-16] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,918: WARNING/ForkPoolWorker-17] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,935: WARNING/ForkPoolWorker-18] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,952: WARNING/ForkPoolWorker-19] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,974: WARNING/ForkPoolWorker-20] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:21,994: WARNING/ForkPoolWorker-21] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:22,010: WARNING/ForkPoolWorker-22] 【Hook】Worker 子进程已启动,正在初始化资源... [2025-06-28 18:19:22,020: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2025-06-28 18:19:22,023: INFO/MainProcess] mingle: searching for neighbors [2025-06-28 18:19:23,052: INFO/MainProcess] mingle: all alone [2025-06-28 18:19:23,062: INFO/MainProcess] celery@LAPTOP-S3BIPLGN ready. [2025-06-28 18:19:48,970: INFO/MainProcess] Task celery_app.add[ac943749-c241-48b3-8a50-8612094223e5] received [2025-06-28 18:19:48,970: INFO/MainProcess] Task celery_app.multiply[ede8b30f-c1c2-4d86-b50d-11099fd2d56d] received [2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-16] 【Pre-run】Task celery_app.add (ac943749-c241-48b3-8a50-8612094223e5) 正在启动,参数: [4, 6], {} [2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-1] 【Pre-run】Task celery_app.multiply (ede8b30f-c1c2-4d86-b50d-11099fd2d56d) 正在启动,参数: [5, 7], {} [2025-06-28 18:19:48,971: INFO/ForkPoolWorker-1] Task celery_app.multiply[ede8b30f-c1c2-4d86-b50d-11099fd2d56d] succeeded in 0.0004443160014488967s: 35 [2025-06-28 18:19:48,971: INFO/ForkPoolWorker-16] Task celery_app.add[ac943749-c241-48b3-8a50-8612094223e5] succeeded in 0.00045413700172503013s: 10 [2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-16] 【Post-run】Task celery_app.add (ac943749-c241-48b3-8a50-8612094223e5) 已完成,状态: SUCCESS, 返回值: 10 [2025-06-28 18:19:48,971: WARNING/ForkPoolWorker-1] 【Post-run】Task celery_app.multiply (ede8b30f-c1c2-4d86-b50d-11099fd2d56d) 已完成,状态: SUCCESS, 返回值: 35 $ python3 run_task.py Add Task ID: ac943749-c241-48b3-8a50-8612094223e5 Multiply Task ID: ede8b30f-c1c2-4d86-b50d-11099fd2d56d
任务监控
在 Celery 中,当我们通过 delay()
或 apply_async()
向异步任务发送请求后,会返回一个 AsyncResult
实例。这个对象包含了任务的唯一标识(UUID
)以及用于查询任务状态的结果后端(backend
),可以实现任务监控、状态追踪和结果获取。
1 2 3 4 5 6 7 8 from celery_app import add_numberstask_data = {'num1' : 10 , 'num2' : 20 } celery_task = add_numbers.apply_async(kwargs=task_data) print ("任务ID:" , celery_task.id )
常用的 AsyncResult
方法有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import time from celery_app import add_numberstask_data = {'num1' : 10, 'num2' : 20} celery_task = add_numbers.apply_async(kwargs =task_data) print (f"任务已提交,任务ID: {celery_task.id}" )while not celery_task.ready(): print ("任务仍在运行或等待中..." ) time.sleep(1) # 每隔1秒检查一次 if celery_task.successful(): print ("✅ 任务成功完成!" ) print ("任务结果:" , celery_task.result) elif celery_task.failed(): print ("❌ 任务执行失败!" ) print ("错误信息:" , celery_task.traceback)
注意:要使用 ready()、successful()、get() 等功能,必须在 Celery 配置中启用 result_backend。比如
1 app.conf.result_backend = 'rpc://'
相关阅读