数据库架构
异步引擎配置
基于 SQLAlchemy 2.0 的异步引擎,支持连接池管理和自动重连:
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG, # 调试模式打印 SQL
pool_size=5, # 连接池常驻连接数
max_overflow=10, # 超出 pool_size 可创建的连接数(总计最多 15)
pool_timeout=30, # 获取连接超时(秒)
pool_recycle=600, # 连接回收时间(秒)
pool_pre_ping=True, # 使用前检查连接有效性,自动重连
pool_reset_on_return="rollback", # 归还时重置状态
)连接池参数说明
| 参数 | 默认值 | 说明 |
|---|---|---|
pool_size | 5 | 常驻连接数 |
max_overflow | 10 | 最大溢出连接数 |
pool_timeout | 30s | 获取连接超时时间 |
pool_recycle | 600s | 连接回收周期 |
pool_pre_ping | True | 使用前检测连接有效性 |
连接池监控
通过 SQLAlchemy 事件监听连接池状态:
python
@event.listens_for(engine.sync_engine, "checkout")
def _on_checkout(dbapi_conn, connection_rec, connection_proxy):
pool = engine.sync_engine.pool
logger.debug(
f"[Pool] checkout: size={pool.size()}, checkedin={pool.checkedin()}, "
f"checkedout={pool.checkedout()}, overflow={pool.overflow()}"
)会话管理
提供三种会话获取方式,适用于不同场景:
1. 普通会话 get_db()
最常用的方式,每个请求获取独立会话,需要手动提交:
python
from app.database import get_db
@router.post("/")
async def create_item(db: AsyncSession = Depends(get_db)):
item = MyModel(name="test")
db.add(item)
await db.commit() # 需要手动提交
return item2. 事务会话 get_db_transaction()
自动管理事务,成功自动提交,异常自动回滚:
python
from app.database import get_db_transaction
@router.post("/")
async def create_items(db: AsyncSession = Depends(get_db_transaction)):
# 所有操作在同一事务中
await ServiceA.create(db, data1, auto_commit=False)
await ServiceB.create(db, data2, auto_commit=False)
# 自动提交或回滚,无需手动调用 commit()注意
使用 get_db_transaction 时,Service 方法应设置 auto_commit=False,避免重复提交。
3. 事务上下文管理器 transaction()
在普通会话中包装事务:
python
from app.database import get_db, transaction
@router.post("/")
async def create_items(db: AsyncSession = Depends(get_db)):
async with transaction(db):
await ServiceA.create(db, data1, auto_commit=False)
await ServiceB.create(db, data2, auto_commit=False)
# 退出 with 块时自动提交或回滚多数据库支持
通过 DB_TYPE 配置切换数据库:
ini
# PostgreSQL(推荐)
DB_TYPE=postgresql
DB_HOST=localhost
DB_PORT=5432
# MySQL
DB_TYPE=mysql
DB_HOST=localhost
DB_PORT=3306
# SQL Server
DB_TYPE=sqlserver
DB_HOST=localhost
DB_PORT=1433自动拼接对应的连接 URL:
| DB_TYPE | 驱动 | URL 格式 |
|---|---|---|
| postgresql | asyncpg | postgresql+asyncpg://user:pass@host:port/db |
| mysql | aiomysql | mysql+aiomysql://user:pass@host:port/db?charset=utf8mb4 |
| sqlserver | aioodbc | mssql+aioodbc://user:pass@host:port/db?driver=ODBC+Driver+18 |
数据库迁移
使用 Alembic 管理数据库迁移:
bash
# 生成迁移脚本(自动检测模型变更)
alembic revision --autogenerate -m "描述信息"
# 执行迁移
alembic upgrade head
# 回滚到上一个版本
alembic downgrade -1
# 查看当前版本
alembic current
# 查看迁移历史
alembic historyAlembic 的 env.py 配置了自动导入所有模型(auto_import_models),新建模型无需手动注册。