Fastapi教程:使用异步sqlalchemy操作mysql

FastAPI 是一个现代、快速(高性能)的 Web 框架,能够与异步数据库库(如 SQLAlchemy)完美结合。本篇文章将通过一个 MySQL 数据库的示例,介绍如何在 FastAPI 中配置数据库连接、定义数据库模型、管理数据库会话,并演示如何进行数据库的增删查改和事务操作。

环境准备

首先,我们需要安装 FastAPI、SQLAlchemy 和异步数据库支持库。假设我们使用 MySQL 作为数据库,所需的依赖库如下:

pip install fastapi
pip install uvicorn
pip install sqlalchemy
pip install mysql-connector-python
pip install asyncmy  # MySQL 的异步驱动

1. 数据库配置

数据库配置部分与前面相同,保持不变:

DATABASE_URL = "mysql+asyncmy://user:password@localhost/db_name"

2. 引擎和会话的配置

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)# 创建异步 Session 类
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False
)

3. 定义数据库模型

继续使用 SQLAlchemy ORM 定义 Item 模型:

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base# 基类
Base = declarative_base()# 定义 Item 模型
class Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True, index=True)name = Column(String, index=True)description = Column(String)

4. 获取数据库会话

from fastapi import Depends# 获取数据库会话
async def get_db():async with AsyncSessionLocal() as session:yield session

5. 数据库初始化

在应用启动时初始化数据库,确保表格的创建:

from fastapi import FastAPIapp = FastAPI()# 初始化数据库
@app.on_event("startup")
async def on_startup():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)

6. 数据库的增删查改(CRUD)

创建数据

将原始的 SQL 查询方式改为 SQLAlchemy ORM 的增操作:

from fastapi import HTTPException@app.post("/items/")
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):db.add(item)  # 添加物品await db.commit()  # 提交事务await db.refresh(item)  # 刷新数据return item

查询数据

改用 SQLAlchemy ORM 的查询方法(filter)来替代 SQL 查询语句:

@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):# 使用 SQLAlchemy ORM 的 filter 查询item = await db.execute(select(Item).filter(Item.id == item_id))item = item.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")return item
  • select(Item):查询 Item 表。
  • filter(Item.id == item_id):根据 item_id 查找特定物品。
  • scalars().first():获取查询结果中的第一个对象。

更新数据

使用 SQLAlchemy ORM 完成数据的更新:

from sqlalchemy.future import select@app.put("/items/{item_id}")
async def update_item(item_id: int, updated_item: Item, db: AsyncSession = Depends(get_db)):# 使用 SQLAlchemy ORM 的 filter 查询result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")# 更新字段item.name = updated_item.nameitem.description = updated_item.descriptionawait db.commit()  # 提交更新await db.refresh(item)  # 刷新数据return item
  • 通过 select(Item).filter(Item.id == item_id) 查询物品。
  • 修改查询结果的字段后,通过 db.commit() 提交更新。

删除数据

将 SQL 查询语句改为使用 SQLAlchemy ORM 的 delete 方法:

@app.delete("/items/{item_id}")
async def delete_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")# 删除物品await db.delete(item)await db.commit()  # 提交删除return {"message": "Item deleted successfully"}

7. 事务的使用

使用 SQLAlchemy 的事务管理功能来确保多个数据库操作的一致性:

@app.post("/create_multiple_items/")
async def create_multiple_items(items: list[Item], db: AsyncSession = Depends(get_db)):async with db.begin():  # 使用事务for item in items:db.add(item)await db.commit()return {"message": "Items created successfully"}
  • async with db.begin() 启动一个事务,所有的数据库操作都将在该事务内进行,要么全部成功,要么全部失败。

完整代码示例

from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_baseDATABASE_URL = "mysql+asyncmy://user:password@localhost/db_name"# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)Base = declarative_base()# 定义 Item 模型
class Item(Base):__tablename__ = "items"id = Column(Integer, primary_key=True, index=True)name = Column(String, index=True)description = Column(String)# FastAPI 实例
app = FastAPI()# 获取数据库会话
async def get_db():async with AsyncSessionLocal() as session:yield session# 初始化数据库
@app.on_event("startup")
async def on_startup():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 增加数据
@app.post("/items/")
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):db.add(item)await db.commit()await db.refresh(item)return item# 查询数据
@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")return item# 更新数据
@app.put("/items/{item_id}")
async def update_item(item_id: int, updated_item: Item, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")item.name = updated_item.nameitem.description = updated_item.descriptionawait db.commit()await db.refresh(item)return item# 删除数据
@app.delete("/items/{item_id}")
async def delete_item(item_id: int, db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item).filter(Item.id == item_id))item = result.scalars().first()if item is None:raise HTTPException(status_code=404, detail="Item not found")await db.delete(item)await db.commit()return {"message": "Item deleted successfully"}# 批量创建数据
@app.post("/create_multiple_items/")
async def create_multiple_items(items: list[Item], db: AsyncSession = Depends(get_db)):async with db.begin():  # 使用事务for item in items:db.add(item)await db.commit()return {"message": "Items created successfully"}

总结

本文详细讲解了如何在 FastAPI 中使用 SQLAlchemy ORM 来完成 MySQL 数据库的增删查改操作,利用异步的数据库引擎和会话机制,让应用在高并发场景下保持高效。通过 SQLAlchemy ORM,我们不仅可以简化数据库操作,还能提高代码的可读性和维护性。在实际开发中,采用 ORM 方式进行数据访问,是开发者处理数据库交互的常见做法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/489493.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(5)4T刷题-逻辑代数基础

(1)逻辑函数的常用表示方法有:真值表、逻辑图、卡诺图、函数表达式 逻辑函数的表达方法中具有唯一性的是:真值表和卡诺图 (2)异或运算(题干意思不明确,应该是按位异或) …

Linux(网络基础和网络标准OSI七层结构)

后面也会持续更新,学到新东西会在其中补充。 建议按顺序食用,欢迎批评或者交流! 缺什么东西欢迎评论!我都会及时修改的! 在这里真的很感谢这位老师的教学视频让迷茫的我找到了很好的学习视频 王晓春老师的个人空间…

向达梦告警日志说声hello

为了调试和跟踪一些业务功能,通常会创建一个日志表,写入每个关键步骤的信息。也可以向达梦数据库的告警日志输出信息,然后通过查看告警日志即可。 在达梦的告警日志中输出一个信息可以这样 SQL> DBMS_SYSTEM.KSDWRT(2,hi dm);

详解 ES6 Reflect

一. 概念 Reflect 是 ES6 中新增的一个内置对象,它提供了一组静态方法,用于操作对象。这些方法与 Object 上的方法具有相同的功能。在这些方法中会调用对应 Object 上的方法,并且返回对应结果。Reflect 的出现主要是为了将一些 Object 对象上…

图像分割数据集海洋水体船只分割数据集labelme格式6123张3类别

数据集格式:labelme格式(不包含mask文件,仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数):6123 标注数量(json文件个数):6123 标注类别数:3 标注类别名称:["water","sea_obstacle",&…

Docker Compose--安装本地maven

原文网址:Docker Compose--安装本地maven-CSDN博客 简介 本文介绍如何使用Docker Compose安装maven。 脚本及配置 路径:/work/env/maven ├── app ├── config │ └── settings.xml ├── docker-compose.yml ├── repository └── t…

EDA - Spring Boot构建基于事件驱动的消息系统

文章目录 概述事件驱动架构的基本概念工程结构Code创建事件和事件处理器创建事件总线创建消息通道和发送逻辑创建事件处理器消息持久化创建消息发送事件配置 Spring Boot 启动类测试消息消费运行项目 概述 在微服务架构和大规模分布式系统中,事件驱动架构&#xff…

数据链路层(Java)(MAC与IP的区别)

以太网协议: "以太⽹" 不是⼀种具体的⽹络, ⽽是⼀种技术标准; 既包含了数据链路层的内容, 也包含了⼀些物理 层的内容. 例如: 规定了⽹络拓扑结构, 访问控制⽅式, 传输速率等; 例如以太⽹中的⽹线必须使⽤双绞线; 传输速率有10M, 100M, 1000M等; 以太…

UNIX数据恢复—UNIX系统常见故障问题和数据恢复方案

UNIX系统常见故障表现: 1、存储结构出错; 2、数据删除; 3、文件系统格式化; 4、其他原因数据丢失。 UNIX系统常见故障解决方案: 1、检测UNIX系统故障涉及的设备是否存在硬件故障,如果存在硬件故障&#xf…

黑马程序员Java项目实战《苍穹外卖》Day12

苍穹外卖-day12 课程内容 工作台Apache POI导出运营数据Excel报表 功能实现:工作台、数据导出 工作台效果图: 数据导出效果图: 在数据统计页面点击数据导出:生成Excel报表 1. 工作台 1.1 需求分析和设计 1.1.1 产品原…

【竞技宝】LOL:JDG官宣yagao离队

北京时间2024年12月13日,在英雄联盟S14全球总决赛结束之后,各大赛区都已经进入了休赛期,目前休赛期也快进入尾声,LPL大部分队伍都开始陆续官宣转会期的动向,其中JDG就在近期正式官宣中单选手yagao离队,而后者大概率将直接选择退役。 近日,JDG战队在官方微博上连续发布阵容变动消…

谷歌浏览器的多账户设置与管理

在数字化时代,我们常常需要在不同的网站和服务上使用多个账户。为了方便管理和保护隐私,谷歌浏览器提供了多账户设置功能。本文将详细介绍如何在Chrome中进行多账户设置与管理,并涵盖一些相关的安全配置和问题解决方法。(本文由ht…

科研绘图系列:R语言绘制网络图和密度分布图(network density plot)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载图1图2图3图4图5图6图7图8系统信息参考介绍 R语言绘制网络图和密度分布图(network & density plot) 加载R包 library(magrittr) library(dplyr) library(…

VRRP的知识点总结及实验

1、VRRP VRRP(Virtual Router Redundancy Protocol,虚拟路由器冗余协议)既能够实现网关的备份,又能解决多个网关之间互相冲突的问题,从而提高网络可靠性。 2、VRRP技术概述: 通过把几台路由设备联合组成一台虚拟的“路由设备”…

数智读书笔记系列002 埃隆·马斯克传

书名:埃隆马斯克传 作者:【美】沃尔特艾萨克森 译者:孙思远;刘家琦 出版社:中信出版集团 出版时间:2023年9月 ISBN:9787521758399 这本书是关于特斯拉CEO埃隆马斯克的传记,作者…

2024年12月13日Github流行趋势

项目名称:nexus-xyz / nexus-zkvm 项目维护者:govereau slumber danielmarinq sjudson yoichi-nexus项目介绍:Nexus zkVM 是一个零知识虚拟机。项目star数:1,948项目fork数:343 项目名称:soxoj / maigret …

016 在路由器上配置 DHCP

配置路由器端口IP地址 将路由器的端口地址配置好, 左边的网络地址是 192.168.1.0 右边的网络地址是 192.168.2.0 配置路由器的DHCP服务 打开命令窗口,进入特权模式 进入全局配置 conf t创建一个DHCP地址池; po1 是地址池的名称&#xf…

使用ElasticSearch实现全文检索

文章目录 全文检索任务描述技术难点任务目标实现过程1. java读取Json文件,并导入MySQL数据库中2. 利用Logstah完成MySQL到ES的数据同步3. 开始编写功能接口3.1 全文检索接口3.2 查询详情 4. 前端调用 全文检索 任务描述 在获取到数据之后如何在ES中进行数据建模&a…

常见软件漏洞修复

1. 3306 MYSQL 升级到最新版,下载地址为 MySQL :: Download MySQL Installer,这里注意,不要跨大版本升级,只升级小版本号,例如mysql5.7.22只需要升级到最新的5.7.44。注意只使用长期维护的稳定版本。设置mysql允许连接…

【C++算法】42.模拟_数青蛙

文章目录 题目链接:题目描述:解法C 算法代码: 题目链接: 1419. 数青蛙 题目描述: 解法 模拟: 利用一个指针,从前往后遍历,遍历到r就看前面有没有c,遍历到o就看前面有没有…