FastAPI 是一个现代、快速(高性能)的 Web 框架,能够与异步数据库库(如 SQLAlchemy)完美结合。本篇文章将通过一个 MySQL 数据库的示例,介绍如何在 FastAPI 中配置数据库连接、定义数据库模型、管理数据库会话,并演示如何进行数据库的增删查改和事务操作。
环境准备
首先,我们需要安装 FastAPI、SQLAlchemy 和异步数据库支持库。假设我们使用 MySQL 作为数据库,所需的依赖库如下:
pip install fastapi
pip install uvicorn
pip install sqlalchemy
pip install mysql-connector-python
pip install asyncmy # MySQL 的异步驱动
1. 数据库配置
数据库配置部分与前面相同,保持不变:
DATABASE_URL = "mysql+asyncmy://user:password@localhost/db_name"
2. 引擎和会话的配置
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)# 创建异步 Session 类
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False
)
3. 定义数据库模型
继续使用 SQLAlchemy ORM 定义 Item
模型:
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base# 基类
Base = declarative_base()# 定义 Item 模型
class Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True, index=True)name = Column(String, index=True)description = Column(String)
4. 获取数据库会话
from fastapi import Depends# 获取数据库会话
async def get_db():async with AsyncSessionLocal() as session:yield session
5. 数据库初始化
在应用启动时初始化数据库,确保表格的创建:
from fastapi import FastAPIapp = FastAPI()# 初始化数据库
@app.on_event("startup")
async def on_startup():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)
6. 数据库的增删查改(CRUD)
创建数据
将原始的 SQL 查询方式改为 SQLAlchemy ORM 的增操作:
from fastapi import HTTPException@app.post("/items/")
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):db.add(item) # 添加物品await db.commit() # 提交事务await db.refresh(item) # 刷新数据return item
查询数据
改用 SQLAlchemy ORM 的查询方法(filter
)来替代 SQL 查询语句:
@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):# 使用 SQLAlchemy ORM 的 filter 查询item = await db.execute(select(Item).filter(Item.id == item_id))item = item.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")return item
select(Item)
:查询Item
表。filter(Item.id == item_id)
:根据item_id
查找特定物品。scalars().first()
:获取查询结果中的第一个对象。
更新数据
使用 SQLAlchemy ORM 完成数据的更新:
from sqlalchemy.future import select@app.put("/items/{item_id}")
async def update_item(item_id: int, updated_item: Item, db: AsyncSession = Depends(get_db)):# 使用 SQLAlchemy ORM 的 filter 查询result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")# 更新字段item.name = updated_item.nameitem.description = updated_item.descriptionawait db.commit() # 提交更新await db.refresh(item) # 刷新数据return item
- 通过
select(Item).filter(Item.id == item_id)
查询物品。 - 修改查询结果的字段后,通过
db.commit()
提交更新。
删除数据
将 SQL 查询语句改为使用 SQLAlchemy ORM 的 delete
方法:
@app.delete("/items/{item_id}")
async def delete_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")# 删除物品await db.delete(item)await db.commit() # 提交删除return {"message": "Item deleted successfully"}
7. 事务的使用
使用 SQLAlchemy 的事务管理功能来确保多个数据库操作的一致性:
@app.post("/create_multiple_items/")
async def create_multiple_items(items: list[Item], db: AsyncSession = Depends(get_db)):async with db.begin(): # 使用事务for item in items:db.add(item)await db.commit()return {"message": "Items created successfully"}
async with db.begin()
启动一个事务,所有的数据库操作都将在该事务内进行,要么全部成功,要么全部失败。
完整代码示例
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_baseDATABASE_URL = "mysql+asyncmy://user:password@localhost/db_name"# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)Base = declarative_base()# 定义 Item 模型
class Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True, index=True)name = Column(String, index=True)description = Column(String)# FastAPI 实例
app = FastAPI()# 获取数据库会话
async def get_db():async with AsyncSessionLocal() as session:yield session# 初始化数据库
@app.on_event("startup")
async def on_startup():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 增加数据
@app.post("/items/")
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):db.add(item)await db.commit()await db.refresh(item)return item# 查询数据
@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")return item# 更新数据
@app.put("/items/{item_id}")
async def update_item(item_id: int, updated_item: Item, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")item.name = updated_item.nameitem.description = updated_item.descriptionawait db.commit()await db.refresh(item)return item# 删除数据
@app.delete("/items/{item_id}")
async def delete_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")await db.delete(item)await db.commit()return {"message": "Item deleted successfully"}# 批量创建数据
@app.post("/create_multiple_items/")
async def create_multiple_items(items: list[Item], db: AsyncSession = Depends(get_db)):async with db.begin(): # 使用事务for item in items:db.add(item)await db.commit()return {"message": "Items created successfully"}
总结
本文详细讲解了如何在 FastAPI 中使用 SQLAlchemy ORM 来完成 MySQL 数据库的增删查改操作,利用异步的数据库引擎和会话机制,让应用在高并发场景下保持高效。通过 SQLAlchemy ORM,我们不仅可以简化数据库操作,还能提高代码的可读性和维护性。在实际开发中,采用 ORM 方式进行数据访问,是开发者处理数据库交互的常见做法。