在高并发或分布式系统中,异步队列和任务调度系统用于管理和调度任务,尤其是在需要分布式执行和异步处理时。使用队列可以确保任务的顺序执行和负载均衡。
Python 提供了多种工具来实现异步队列和任务调度,asyncio 和 Celery 是最常见的选择。
12.1 使用 asyncio 实现异步队列
asyncio.Queue 是 Python 原生支持的异步队列,用于在任务间传递数据。它基于 asyncio 事件循环,并且支持异步的 put() 和 get() 操作。
12.1.1 异步队列基本使用
生产者与消费者模式
- 生产者将任务放入队列中。
- 消费者从队列中获取任务并执行。
import asyncio
async def producer(queue):
for i in range(5):
await asyncio.sleep(1) # 模拟任务生产
await queue.put(i)
print(f"生产任务 {i}")
async def consumer(queue):
while True:
task = await queue.get()
if task is None:
break # 用 None 作为结束标志
print(f"消费任务 {task}")
await asyncio.sleep(2) # 模拟任务处理
queue.task_done()
async def main():
queue = asyncio.Queue()
prod = asyncio.create_task(producer(queue))
cons = asyncio.create_task(consumer(queue))
await prod
await queue.join() # 等待所有任务完成
cons.cancel() # 取消消费者任务
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
12.1.2 异步队列的特点
- 异步队列不阻塞主线程,可以实现高效的任务调度。
- 适用于需要异步处理的场景,如多个任务的分发与执行。
- 支持队列长度限制,可以避免任务堆积。
12.2 使用 Celery 实现任务队列和调度系统
Celery 是一个分布式任务队列,它非常适用于需要异步任务处理的场景,并且支持定时任务调度。
12.2.1 安装和配置 Celery
首先,安装 Celery:
pip install celery
然后,配置一个简单的 Celery 应用:
任务定义
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
在这里,我们定义了一个简单的任务 add,它接收两个参数并返回它们的和。
12.2.2 运行 Celery worker
在命令行启动 Celery worker:
celery -A tasks worker --loglevel=info
Celery worker 会监听 Redis 中的任务队列,等待接收和执行任务。
12.2.3 调用任务
通过 Celery 发送异步任务:
from tasks import add
result = add.delay(4, 6) # 发送异步任务
print(f"任务返回值: {result.get(timeout=10)}") # 获取任务结果
add.delay(4, 6) 会将任务 add 异步发送到 Celery 队列中进行执行,result.get(timeout=10) 会等待任务的结果返回。
12.2.4 定时任务
Celery 还支持定时任务调度,可以使用 celery.beat 实现任务定时执行。配置 Celery Beat 和 Celery Worker 的调度功能。
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-10-seconds': {
'task': 'tasks.add',
'schedule': crontab(minute='*/1'), # 每分钟执行一次
'args': (16, 16),
},
}
app.conf.timezone = 'UTC'
12.3 高级用法:任务优先级和结果存储
在 Celery 中,可以设置任务优先级,并且存储任务结果以供后续查询。
12.3.1 任务优先级
Celery 支持任务的优先级队列配置,可以控制任务的执行顺序。使用 priority 参数设置任务优先级:
from tasks import add
result = add.apply_async((4, 6), priority=10) # 设置优先级
12.3.2 任务结果存储
Celery 还可以将任务结果存储到数据库或缓存系统中(如 Redis、MongoDB)。这对于后续的任务结果查询非常有用。
app.conf.result_backend = 'redis://localhost:6379/0'
12.4 任务调度和队列优化
12.4.1 优化任务队列
- 任务分配:通过设置优先级,可以确保关键任务得到优先处理。
- 负载均衡:可以通过 Celery 的自动调度系统(如 Flower)监控任务负载,确保任务分配均衡。
- 限流控制:限制任务的并发量,避免系统过载,确保任务按顺序有序执行。
12.4.2 任务调度
通过调度器(如 Celery Beat),可以灵活地安排周期任务、定时任务等,减少人工干预,提高自动化程度。
总结:
asyncio.Queue适合于单机高并发的异步队列,能够在内存中进行高效的任务分配和处理。Celery则是一个更适合大规模分布式系统的任务队列,支持优先级队列、任务结果存储、任务调度等功能,非常适合分布式任务处理和定时任务。
下篇文章我们将探讨如何构建高效的异步 Web 服务和 API!