FastAPI 数据库集成
FastAPI 数据库集成学习笔记FastAPI 本身不绑定特定数据库但通过SQLAlchemy(ORM) 或SQLModel(SQLAlchemy 的扩展) 可以非常优雅地集成关系型数据库PostgreSQL, MySQL, SQLite。对于 NoSQL通常使用Motor(MongoDB) 或Redis客户端。本笔记重点讲解SQLAlchemy 2.0 (Async)和SQLModel的集成方案这是目前最主流和推荐的方式。一、核心架构FastAPI 数据库集成的标准模式是依赖注入 (Dependency Injection)管理数据库会话请求 → 依赖函数 (get_db) → 创建 Session → 执行业务逻辑 → 提交/回滚 → 关闭 Session关键组件Engine: 数据库连接池管理底层连接。Session: 一次数据库操作的上下文管理事务。Model: 数据库表结构定义 (ORM)。Dependency: 提供 Session 给路由确保自动关闭。二、方案 A使用 SQLAlchemy (原生 ORM)1. 安装依赖pipinstallsqlalchemy[asyncio]aiosqliteasyncpgdatabases# 如果是 MySQL: pip install mysql-async# 如果是 PostgreSQL: pip install asyncpg2. 数据库配置 (database.py)fromsqlalchemy.ext.asyncioimportcreate_async_engine,AsyncSession,async_sessionmakerfromsqlalchemy.ormimportdeclarative_base# 数据库 URL# SQLite: sqliteaiosqlite:///./test.db# PostgreSQL: postgresqlasyncpg://user:passlocalhost/dbname# MySQL: mysqlaiomysql://user:passlocalhost/dbnameDATABASE_URLsqliteaiosqlite:///./test.db# 创建引擎 (异步)enginecreate_async_engine(DATABASE_URL,echoTrue,# 打印 SQL 日志生产环境设为 FalsefutureTrue,)# 创建会话工厂AsyncSessionLocalasync_sessionmaker(engine,class_AsyncSession,expire_on_commitFalse,autocommitFalse,autoflushFalse,)# 基类用于定义模型Basedeclarative_base()# 依赖函数获取 Sessionasyncdefget_db():asyncwithAsyncSessionLocal()assession:try:yieldsessionawaitsession.commit()# 成功则提交exceptException:awaitsession.rollback()# 失败则回滚raisefinally:awaitsession.close()3. 定义模型 (models.py)fromsqlalchemyimportColumn,Integer,String,DateTimefromsqlalchemy.sqlimportfuncfromdatabaseimportBaseclassUser(Base):__tablename__usersidColumn(Integer,primary_keyTrue,indexTrue)emailColumn(String,uniqueTrue,indexTrue,nullableFalse)usernameColumn(String,uniqueTrue,indexTrue,nullableFalse)hashed_passwordColumn(String,nullableFalse)is_activeColumn(Integer,default1)created_atColumn(DateTime(timezoneTrue),server_defaultfunc.now())4. 创建表 (main.py)fromfastapiimportFastAPIfromdatabaseimportengine,BasefrommodelsimportUser appFastAPI()app.on_event(startup)asyncdefstartup():# 异步创建表asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)5. 路由操作 (routers/users.py)fromfastapiimportAPIRouter,Depends,HTTPException,statusfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselectfromtypingimportListfromdatabaseimportget_dbfrommodelsimportUserfrompydanticimportBaseModel routerAPIRouter(prefix/users,tags[users])# Pydantic 模型classUserCreate(BaseModel):email:strusername:strpassword:strclassUserResponse(BaseModel):id:intemail:strusername:stris_active:boolclassConfig:from_attributesTrue# SQLAlchemy 2.0 替代 orm_moderouter.post(/,response_modelUserResponse)asyncdefcreate_user(user:UserCreate,db:AsyncSessionDepends(get_db)):# 检查是否存在resultawaitdb.execute(select(User).where(User.emailuser.email))ifresult.scalar_one_or_none():raiseHTTPException(status_code400,detailEmail already registered)# 创建新对象db_userUser(emailuser.email,usernameuser.username,hashed_passworduser.password# 实际应哈希)db.add(db_user)awaitdb.commit()awaitdb.refresh(db_user)returndb_userrouter.get(/,response_modelList[UserResponse])asyncdefread_users(skip:int0,limit:int10,db:AsyncSessionDepends(get_db)):resultawaitdb.execute(select(User).offset(skip).limit(limit))usersresult.scalars().all()returnusersrouter.get(/{user_id},response_modelUserResponse)asyncdefread_user(user_id:int,db:AsyncSessionDepends(get_db)):resultawaitdb.execute(select(User).where(User.iduser_id))userresult.scalar_one_or_none()ifnotuser:raiseHTTPException(status_code404,detailUser not found)returnuser三、方案 B使用 SQLModel (推荐)SQLModel是 FastAPI 作者开发的库无缝集成 SQLAlchemy Pydantic代码更简洁类型提示更友好。1. 安装pipinstallsqlmodel2. 配置 (database.py)fromsqlmodelimportSQLModel,create_engine,SessionfromtypingimportGenerator# 注意SQLModel 默认使用同步引擎但也可以配置异步# 这里演示同步 Session (FastAPI 对同步支持也很好性能差异不大)SQLITE_URLsqlite:///./test.dbenginecreate_engine(SQLITE_URL,echoTrue,connect_args{check_same_thread:False})defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defget_session()-Generator[Session,None,None]:withSession(engine)assession:yieldsession3. 定义模型 (models.py)fromtypingimportOptionalfromsqlmodelimportSQLModel,FieldclassUser(SQLModel,tableTrue):__tablename__usersid:Optional[int]Field(defaultNone,primary_keyTrue)email:strField(uniqueTrue,indexTrue)username:strField(uniqueTrue,indexTrue)hashed_password:stris_active:boolTrue4. 路由 (routers/users.py)fromfastapiimportAPIRouter,Depends,HTTPException,statusfromsqlmodelimportSession,selectfromtypingimportListfromdatabaseimportget_sessionfrommodelsimportUser,UserCreate# 假设 UserCreate 是纯 Pydantic 模型routerAPIRouter(prefix/users,tags[users])router.post(/,response_modelUser)defcreate_user(user:UserCreate,session:SessionDepends(get_session)):# 检查存在existingsession.exec(select(User).where(User.emailuser.email)).first()ifexisting:raiseHTTPException(status_code400,detailEmail exists)db_userUser(**user.dict())session.add(db_user)session.commit()session.refresh(db_user)returndb_userrouter.get(/,response_modelList[User])defread_users(skip:int0,limit:int10,session:SessionDepends(get_session)):userssession.exec(select(User).offset(skip).limit(limit)).all()returnusers优势SQLModel自动处理from_attributes无需手动配置 Pydantic 的orm_mode。四、异步 vs 同步特性异步 (Async)同步 (Sync)依赖sqlalchemy[asyncio],aiosqlite,asyncpgsqlalchemy性能高并发下更优不阻塞事件循环简单场景足够代码更易读代码风格需async/await普通函数适用场景高并发 I/O 密集型一般业务快速开发FastAPI 支持完美支持完美支持 (自动在线程池运行)建议新项目优先尝试SQLModel (同步)开发效率最高。高并发场景或已有异步生态使用SQLAlchemy Async。五、数据库迁移 (Alembic)生产环境不能直接create_all需要使用Alembic进行版本控制迁移。1. 安装与初始化pipinstallalembic alembic init alembic2. 配置alembic.ini和env.py在alembic/env.py中配置target_metadata和sqlalchemy.url# env.pyfromsqlmodelimportSQLModelfromdatabaseimportengine# 确保 target_metadata 指向你的 Base 或 SQLModel.metadatatarget_metadataSQLModel.metadata# 配置 URLconfig.set_main_option(sqlalchemy.url,sqlite:///./test.db)3. 生成迁移# 生成迁移脚本alembic revision--autogenerate-mInitial migration# 应用迁移alembic upgradehead4. 回滚alembic downgrade-1六、事务管理1. 自动提交 (依赖注入中)在get_db依赖中yield后自动commit异常时rollback。2. 手动控制事务fromfastapiimportDependsfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselectrouter.post(/batch)asyncdefbatch_create(users:List[UserCreate],db:AsyncSessionDepends(get_db)):try:foruinusers:db_userUser(**u.dict())db.add(db_user)awaitdb.commit()# 全部成功才提交return{msg:All created}exceptException:awaitdb.rollback()raise七、常见错误与排查错误原因解决方案sqlite3.OperationalError: database is locked同步 SQLite 并发写冲突使用check_same_threadFalse或改用 PostgreSQLRuntimeError: This event loop is already running在异步环境中使用同步驱动改用aiosqlite/asyncpgDetachedInstanceErrorSession 关闭后访问对象使用lazyjoined或session.refresh()Column not found模型修改后未迁移运行alembic revision --autogenerateCircular dependency模型间循环引用使用ForeignKey字符串引用或relationship的back_populates八、最佳实践总结使用依赖注入永远通过Depends(get_db)获取 Session不要全局单例。分离模型ORM Model: 对应数据库表 (SQLAlchemy/SQLModel)。Pydantic Model: 对应请求/响应 (不含敏感字段如hashed_password)。异步优先高并发场景使用AsyncSession。迁移管理生产环境必须使用 Alembic禁止自动建表。连接池配置合理的pool_size和max_overflow。错误处理捕获数据库异常返回友好的 HTTP 错误。九、完整项目结构示例project/ ├── app/ │ ├── __init__.py │ ├── main.py # 启动注册路由创建表 │ ├── config.py # 配置 (DB_URL, SECRET_KEY) │ ├── database.py # Engine, Session, get_db │ ├── models.py # SQLAlchemy/SQLModel 模型 │ ├── schemas.py # Pydantic 请求/响应模型 │ ├── dependencies.py # 认证、权限等依赖 │ └── routers/ │ ├── auth.py │ └── users.py ├── alembic/ # 迁移脚本 ├── alembic.ini └── requirements.txt通过这种结构FastAPI 项目可以保持高可维护性、类型安全和良好的扩展性。