Skip to content

数据库架构

异步引擎配置

基于 SQLAlchemy 2.0 的异步引擎,支持连接池管理和自动重连:

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,          # 调试模式打印 SQL
    pool_size=5,                  # 连接池常驻连接数
    max_overflow=10,              # 超出 pool_size 可创建的连接数(总计最多 15)
    pool_timeout=30,              # 获取连接超时(秒)
    pool_recycle=600,             # 连接回收时间(秒)
    pool_pre_ping=True,           # 使用前检查连接有效性,自动重连
    pool_reset_on_return="rollback",  # 归还时重置状态
)

连接池参数说明

参数默认值说明
pool_size5常驻连接数
max_overflow10最大溢出连接数
pool_timeout30s获取连接超时时间
pool_recycle600s连接回收周期
pool_pre_pingTrue使用前检测连接有效性

连接池监控

通过 SQLAlchemy 事件监听连接池状态:

python
@event.listens_for(engine.sync_engine, "checkout")
def _on_checkout(dbapi_conn, connection_rec, connection_proxy):
    pool = engine.sync_engine.pool
    logger.debug(
        f"[Pool] checkout: size={pool.size()}, checkedin={pool.checkedin()}, "
        f"checkedout={pool.checkedout()}, overflow={pool.overflow()}"
    )

会话管理

提供三种会话获取方式,适用于不同场景:

1. 普通会话 get_db()

最常用的方式,每个请求获取独立会话,需要手动提交:

python
from app.database import get_db

@router.post("/")
async def create_item(db: AsyncSession = Depends(get_db)):
    item = MyModel(name="test")
    db.add(item)
    await db.commit()  # 需要手动提交
    return item

2. 事务会话 get_db_transaction()

自动管理事务,成功自动提交,异常自动回滚:

python
from app.database import get_db_transaction

@router.post("/")
async def create_items(db: AsyncSession = Depends(get_db_transaction)):
    # 所有操作在同一事务中
    await ServiceA.create(db, data1, auto_commit=False)
    await ServiceB.create(db, data2, auto_commit=False)
    # 自动提交或回滚,无需手动调用 commit()

注意

使用 get_db_transaction 时,Service 方法应设置 auto_commit=False,避免重复提交。

3. 事务上下文管理器 transaction()

在普通会话中包装事务:

python
from app.database import get_db, transaction

@router.post("/")
async def create_items(db: AsyncSession = Depends(get_db)):
    async with transaction(db):
        await ServiceA.create(db, data1, auto_commit=False)
        await ServiceB.create(db, data2, auto_commit=False)
        # 退出 with 块时自动提交或回滚

多数据库支持

通过 DB_TYPE 配置切换数据库:

ini
# PostgreSQL(推荐)
DB_TYPE=postgresql
DB_HOST=localhost
DB_PORT=5432

# MySQL
DB_TYPE=mysql
DB_HOST=localhost
DB_PORT=3306

# SQL Server
DB_TYPE=sqlserver
DB_HOST=localhost
DB_PORT=1433

自动拼接对应的连接 URL:

DB_TYPE驱动URL 格式
postgresqlasyncpgpostgresql+asyncpg://user:pass@host:port/db
mysqlaiomysqlmysql+aiomysql://user:pass@host:port/db?charset=utf8mb4
sqlserveraioodbcmssql+aioodbc://user:pass@host:port/db?driver=ODBC+Driver+18

数据库迁移

使用 Alembic 管理数据库迁移:

bash
# 生成迁移脚本(自动检测模型变更)
alembic revision --autogenerate -m "描述信息"

# 执行迁移
alembic upgrade head

# 回滚到上一个版本
alembic downgrade -1

# 查看当前版本
alembic current

# 查看迁移历史
alembic history

Alembic 的 env.py 配置了自动导入所有模型(auto_import_models),新建模型无需手动注册。

Released under the MIT License.