Python3 网络编程12:异步队列和任务调度系统
                           
天天向上
发布: 2025-03-16 12:26:17

原创
135 人浏览过

在高并发或分布式系统中,异步队列和任务调度系统用于管理和调度任务,尤其是在需要分布式执行和异步处理时。使用队列可以确保任务的顺序执行和负载均衡。

Python 提供了多种工具来实现异步队列和任务调度,asyncioCelery 是最常见的选择。

12.1 使用 asyncio 实现异步队列

asyncio.Queue 是 Python 原生支持的异步队列,用于在任务间传递数据。它基于 asyncio 事件循环,并且支持异步的 put()get() 操作。

12.1.1 异步队列基本使用

生产者与消费者模式

  • 生产者将任务放入队列中。
  • 消费者从队列中获取任务并执行。
import asyncio

async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)  # 模拟任务生产
        await queue.put(i)
        print(f"生产任务 {i}")

async def consumer(queue):
    while True:
        task = await queue.get()
        if task is None:
            break  # 用 None 作为结束标志
        print(f"消费任务 {task}")
        await asyncio.sleep(2)  # 模拟任务处理
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    prod = asyncio.create_task(producer(queue))
    cons = asyncio.create_task(consumer(queue))
    await prod
    await queue.join()  # 等待所有任务完成
    cons.cancel()  # 取消消费者任务

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

12.1.2 异步队列的特点

  • 异步队列不阻塞主线程,可以实现高效的任务调度。
  • 适用于需要异步处理的场景,如多个任务的分发与执行。
  • 支持队列长度限制,可以避免任务堆积。

12.2 使用 Celery 实现任务队列和调度系统

Celery 是一个分布式任务队列,它非常适用于需要异步任务处理的场景,并且支持定时任务调度。

12.2.1 安装和配置 Celery

首先,安装 Celery:

pip install celery

然后,配置一个简单的 Celery 应用:

任务定义

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

在这里,我们定义了一个简单的任务 add,它接收两个参数并返回它们的和。

12.2.2 运行 Celery worker

在命令行启动 Celery worker:

celery -A tasks worker --loglevel=info

Celery worker 会监听 Redis 中的任务队列,等待接收和执行任务。

12.2.3 调用任务

通过 Celery 发送异步任务:

from tasks import add

result = add.delay(4, 6)  # 发送异步任务
print(f"任务返回值: {result.get(timeout=10)}")  # 获取任务结果

add.delay(4, 6) 会将任务 add 异步发送到 Celery 队列中进行执行,result.get(timeout=10) 会等待任务的结果返回。

12.2.4 定时任务

Celery 还支持定时任务调度,可以使用 celery.beat 实现任务定时执行。配置 Celery Beat 和 Celery Worker 的调度功能。

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-10-seconds': {
        'task': 'tasks.add',
        'schedule': crontab(minute='*/1'),  # 每分钟执行一次
        'args': (16, 16),
    },
}

app.conf.timezone = 'UTC'

12.3 高级用法:任务优先级和结果存储

在 Celery 中,可以设置任务优先级,并且存储任务结果以供后续查询。

12.3.1 任务优先级

Celery 支持任务的优先级队列配置,可以控制任务的执行顺序。使用 priority 参数设置任务优先级:

from tasks import add

result = add.apply_async((4, 6), priority=10)  # 设置优先级

12.3.2 任务结果存储

Celery 还可以将任务结果存储到数据库或缓存系统中(如 Redis、MongoDB)。这对于后续的任务结果查询非常有用。

app.conf.result_backend = 'redis://localhost:6379/0'

12.4 任务调度和队列优化

12.4.1 优化任务队列

  • 任务分配:通过设置优先级,可以确保关键任务得到优先处理。
  • 负载均衡:可以通过 Celery 的自动调度系统(如 Flower)监控任务负载,确保任务分配均衡。
  • 限流控制:限制任务的并发量,避免系统过载,确保任务按顺序有序执行。

12.4.2 任务调度

通过调度器(如 Celery Beat),可以灵活地安排周期任务、定时任务等,减少人工干预,提高自动化程度。


总结:

  • asyncio.Queue 适合于单机高并发的异步队列,能够在内存中进行高效的任务分配和处理。
  • Celery 则是一个更适合大规模分布式系统的任务队列,支持优先级队列、任务结果存储、任务调度等功能,非常适合分布式任务处理和定时任务。

下篇文章我们将探讨如何构建高效的异步 Web 服务和 API!

发表回复 0

Your email address will not be published. Required fields are marked *