Python3 网络编程10:高效的异步数据库操作
                           
天天向上
发布: 2025-03-16 12:23:20

原创
53 人浏览过

在构建高性能的应用时,数据库操作的性能往往是瓶颈之一。通过异步数据库操作,可以大幅提高并发请求的处理能力。我们将介绍如何在异步编程中处理数据库操作,重点介绍如何使用 asyncio 和异步数据库库。


10.1 使用 aiomysql 进行 MySQL 异步操作

aiomysql 是一个与 asyncio 兼容的 MySQL 客户端库,可以实现异步数据库连接、查询和事务。

10.1.1 安装 aiomysql

pip install aiomysql

10.1.2 异步连接数据库

import aiomysql
import asyncio

async def test_connection():
    # 创建数据库连接池
    pool = await aiomysql.create_pool(
        host='localhost', port=3306,
        user='root', password='password',
        db='testdb', loop=loop
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT VERSION()")
            version = await cursor.fetchone()
            print("Database version:", version)

loop = asyncio.get_event_loop()
loop.run_until_complete(test_connection())

10.1.3 异步执行查询

async def execute_query():
    pool = await aiomysql.create_pool(
        host='localhost', port=3306,
        user='root', password='password',
        db='testdb', loop=loop
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT * FROM users")
            result = await cursor.fetchall()
            print("Query result:", result)

loop.run_until_complete(execute_query())

10.2 使用 asyncpg 进行 PostgreSQL 异步操作

asyncpg 是一个高性能的 PostgreSQL 客户端库,支持异步数据库操作。

10.2.1 安装 asyncpg

pip install asyncpg

10.2.2 异步连接 PostgreSQL

import asyncpg
import asyncio

async def test_connection():
    conn = await asyncpg.connect(user='postgres', password='password', database='testdb', host='127.0.0.1')
    version = await conn.fetch('SELECT version()')
    print("Database version:", version)
    await conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(test_connection())

10.2.3 异步执行查询

async def execute_query():
    conn = await asyncpg.connect(user='postgres', password='password', database='testdb', host='127.0.0.1')
    result = await conn.fetch('SELECT * FROM users')
    print("Query result:", result)
    await conn.close()

loop.run_until_complete(execute_query())

10.3 异步事务操作

数据库事务在多步骤操作中非常重要。异步事务允许我们在执行多个数据库操作时保持非阻塞。

10.3.1 使用 aiomysql 执行异步事务

async def execute_transaction():
    pool = await aiomysql.create_pool(
        host='localhost', port=3306,
        user='root', password='password',
        db='testdb', loop=loop
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await conn.begin()  # 开始事务
            try:
                await cursor.execute("UPDATE users SET balance = balance - 100 WHERE id = 1")
                await cursor.execute("UPDATE users SET balance = balance + 100 WHERE id = 2")
                await conn.commit()  # 提交事务
                print("事务提交成功")
            except Exception as e:
                await conn.rollback()  # 发生异常时回滚事务
                print("事务回滚:", e)

loop.run_until_complete(execute_transaction())

10.3.2 使用 asyncpg 执行异步事务

async def execute_transaction():
    conn = await asyncpg.connect(user='postgres', password='password', database='testdb', host='127.0.0.1')
    async with conn.transaction():  # 使用事务
        try:
            await conn.execute("UPDATE users SET balance = balance - 100 WHERE id = 1")
            await conn.execute("UPDATE users SET balance = balance + 100 WHERE id = 2")
            print("事务提交成功")
        except Exception as e:
            print("事务回滚:", e)
    await conn.close()

loop.run_until_complete(execute_transaction())

10.4 异步数据库连接池

使用连接池可以管理多个数据库连接,避免频繁创建和销毁连接,提高数据库访问效率。

10.4.1 使用 aiomysql 连接池

async def execute_with_pool():
    pool = await aiomysql.create_pool(
        host='localhost', port=3306,
        user='root', password='password',
        db='testdb', loop=loop
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT * FROM users")
            result = await cursor.fetchall()
            print("Query result:", result)

loop.run_until_complete(execute_with_pool())

10.4.2 使用 asyncpg 连接池

async def execute_with_pool():
    conn = await asyncpg.create_pool(user='postgres', password='password', database='testdb', host='127.0.0.1')
    async with conn.acquire() as connection:
        result = await connection.fetch('SELECT * FROM users')
        print("Query result:", result)

loop.run_until_complete(execute_with_pool())

10.5 性能优化与异步数据库操作

  • 批量操作:对于大规模的数据库查询,采用批量操作而不是单次执行多个查询,这样可以减少网络延迟。
  • 连接池:使用连接池来管理数据库连接,避免每次请求都重新创建连接。
  • 并发查询:利用 asyncio.gather() 或类似方法同时执行多个数据库查询,以提高吞吐量。

10.5.1 批量插入操作

async def bulk_insert():
    conn = await asyncpg.connect(user='postgres', password='password', database='testdb', host='127.0.0.1')
    await conn.executemany(
        'INSERT INTO users(name, balance) VALUES($1, $2)',
        [('Alice', 1000), ('Bob', 2000), ('Charlie', 1500)]
    )
    print("批量插入成功")
    await conn.close()

loop.run_until_complete(bulk_insert())

10.5.2 并发查询

async def concurrent_queries():
    conn = await asyncpg.connect(user='postgres', password='password', database='testdb', host='127.0.0.1')
    query1 = conn.fetch('SELECT * FROM users WHERE id=1')
    query2 = conn.fetch('SELECT * FROM users WHERE id=2')
    result1, result2 = await asyncio.gather(query1, query2)
    print(f"查询结果1: {result1}, 查询结果2: {result2}")
    await conn.close()

loop.run_until_complete(concurrent_queries())

总结

  • 异步数据库操作能够显著提高应用的并发处理能力,减少 I/O 阻塞。
  • 使用 aiomysqlasyncpg 等库,我们可以方便地进行异步 MySQL 和 PostgreSQL 操作。
  • 异步事务和连接池能够进一步提高操作效率和稳定性,适用于高并发场景。

下篇文章我们将讨论如何构建异步文件处理系统!

发表回复 0

Your email address will not be published. Required fields are marked *