测试与FastAPI应用数据之间的差异

【squids.cn】 全网zui低价RDS,免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等

当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时,可能会出现以下错误:

  1. 在测试中,在数据库中创建了一个对象(测试会话)。 

  2. 在应用程序中发出一个请求,在此请求中修改了这个对象(应用会话)。 

  3. 在测试中从数据库加载对象,但其中没有所需的更改(测试会话)。 

让我们找出发生了什么。

我们通常在应用程序和测试中使用两个不同的会话。

此外,在测试中,我们通常将会话包装在一个准备数据库进行测试的fixture中,测试完成后,所有内容都会被清理。

以下是应用程序的示例。

一个带有数据库连接的文件app/database.py:

""" Database settings file """
from typing import AsyncGenerator
​
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base
​
DATABASE_URL = "postgresql+asyncpg://user:password@host:5432/dbname"
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
async_session = async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
​
​
async def get_session() -> AsyncGenerator:""" Returns async session """async with async_session() as session:yield session
​
Base = declarative_base()

一个带有模型描述的文件app/models.py: 

""" Model file """from sqlalchemy import Integer, Stringfrom sqlalchemy.orm import Mapped, mapped_columnfrom .database import Baseclass Lamp(Base):   """ Lamp model """   __tablename__ = 'lamps'   id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)   status: Mapped[str] = mapped_column(String, default="off")

一个带有端点描述的文件app/main.py: 

""" Main file """import loggingfrom fastapi import FastAPI, Dependsfrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSessionfrom .database import get_sessionfrom .models import Lampapp = FastAPI()@app.post("/lamps/{lamp_id}/on")async def check_lamp(       lamp_id: int,       session: AsyncSession = Depends(get_session)) -> dict:   """ Lamp on endpoint """   results = await session.execute(select(Lamp).where(Lamp.id == lamp_id))   lamp = results.scalar_one_or_none()   if lamp:       logging.error("Status before update: %s", lamp.status)       lamp.status = "on"       session.add(lamp)       await session.commit()       await session.refresh(lamp)       logging.error("Status after update: %s", lamp.status)   return {}

我特意在示例中添加了日志记录和一些其他请求,以使其更加清晰。

这里,使用Depends创建了一个会话。

以下是带有测试示例的文件tests/test_lamp.py:

""" Test lamp """
import logging
from typing import AsyncGenerator
​
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
​
from app.database import Base, engine
from app.main import app, Lamp
​
​
@pytest_asyncio.fixture(scope="function", name="test_session")
async def test_session_fixture() -> AsyncGenerator:""" Async session fixture """async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
​async with async_session() as session:async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)
​yield session
​async with engine.begin() as conn:await conn.run_sync(Base.metadata.drop_all)
​await engine.dispose()
​
​
@pytest.mark.asyncio
async def test_lamp_on(test_session):""" Test lamp switch on """lamp = Lamp()test_session.add(lamp)await test_session.commit()await test_session.refresh(lamp)
​logging.error("New client status: %s", lamp.status)assert lamp.status == "off"
​async with AsyncClient(app=app, base_url="http://testserver") as async_client:response = await async_client.post(f"/lamps/{lamp.id}/on")assert response.status_code == 200results = await test_session.execute(select(Lamp).where(Lamp.id == lamp.id))new_lamp = results.scalar_one_or_none()logging.error("Updated status: %s", new_lamp.status)
​assert new_lamp.status == "on"

这是一个常规的Pytest,它在一个fixture中获取到数据库的会话。在返回会话之前,此fixture会先创建所有的表格,使用之后,它们会被删除。

请再次注意,在测试中,我们使用来自test_session fixture的会话,而在主代码中,我们使用来自app/database.py文件的会话。尽管我们使用相同的引擎,但是生成的会话是不同的。这一点很重要。

预期的数据库请求序列

应从数据库返回status = on。

在测试中,我首先在数据库中创建一个对象。这是通过来自测试的会话进行的常规INSERT。我们称其为Session 1。此时,只有这个会话连接到了数据库。应用程序会话尚未连接。

在创建对象之后,我执行了一个刷新操作。这是通过Session 1对新创建的对象进行的SELECT,并通过实例更新。

结果,我确保对象正确创建,并且status字段填充了所需的值 - off。

然后,我对/lamps/1/on端点执行一个POST请求。这是打开灯的操作。为了使示例更短,我没有使用fixture。一旦请求开始工作,将创建一个新的数据库会话。我们称其为Session 2。使用这个会话,我从数据库加载所需的对象。我将状态输出到日志。它是off。之后,我更新了这个状态并在数据库中保存了更新。数据库收到了一个请求:

BEGIN (implicit)UPDATE lamps SET status=$1::VARCHAR WHERE lamps.id = $2::INTEGERparameters: ('on', 1)COMMIT

请注意,也存在COMMIT命令。尽管事务是隐式的,但其结果在其他会话中在COMMIT之后立即可用。

接下来,我使用refresh发出请求,从数据库获取更新后的对象。我输出状态。现在它的值是on。

看起来一切都应该正常工作。端点停止工作,关闭Session 2,并将控制权转移到测试。

在测试中,我从Session 1发出常规请求,以获取修改过的对象。但在状态字段中,我看到的值是off。

以下是代码中操作序列的方案。

代码中的操作序列

代码中的操作序列 同时,根据所有日志,最后一个SELECT请求被执行并返回了status = on。此刻,数据库中的其值肯定等于on。这是engine asyncpg响应SELECT请求时接收的值。

那么,发生了什么?

发生了以下情况。

结果是,为获取新对象而做的请求并没有更新当前的对象,而是找到并使用了现有的对象。一开始,我使用ORM添加了一个灯泡对象。我在另一个会话中更改了它。当更改完成时,当前会话对此更改一无所知。并且在Session 2中进行的提交并未在Session 1中请求expire_all方法。

为了修复这个问题,你可以做以下操作之一:

  1. 对于测试和应用程序使用共享会话。

  2. 刷新实例,而不是尝试从数据库中获取它。

  3. 强制使实例过期。

  4. 关闭会话。

依赖性覆盖

为了使用相同的会话,您可以简单地使用我在测试中创建的那个覆盖应用程序中的会话。这很简单。

为此,我们需要在测试中添加以下代码:

async def _override_get_db():   yield test_sessionapp.dependency_overrides[get_session] = _override_get_db

如果你愿意,可以将这部分包装成一个夹具,以便在所有测试中使用。

所得到的算法将如下所示:

代码中使用依赖性覆盖时的步骤

下面是带有会话替代的测试代码:

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   async def _override_get_db():       yield test_session   app.dependency_overrides[get_session] = _override_get_db   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   results = await test_session.execute(select(Lamp).where(Lamp.id == 1))   new_lamp = results.scalar_one_or_none()   logging.error("Updated status: %s", new_lamp.status)   assert new_lamp.status == "on"

但是,如果应用程序使用多个会话(这是可能的),那么这可能不是最佳方法。此外,如果在被测试的函数中没有调用commit或rollback,那么这也将无济于事。

刷新 

第二种解决方案是最简单和最有逻辑的。我们不应该创建一个新的请求来获取一个对象。为了更新,处理端点请求后立即调用刷新就足够了。在内部,它调用expires,这导致保存的实例不用于新的请求,并且数据重新填充。这种解决方案是最有逻辑的,也最容易理解。

await test_session.refresh(lamp)

之后,你不需要再试图加载new_lamp对象,只需检查相同的lamp。

以下是使用刷新的代码方案。

使用刷新时的代码中的步骤 

以下是带有更新的测试代码。

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   await test_session.refresh(lamp)   logging.error("Updated status: %s", lamp.status)   assert lamp.status == "on"

过期 

但是,如果我们更改了很多对象,最好调用expire_all。然后,所有实例都将从数据库中读取,一致性不会被破坏。

test_session.expire_all()

你还可以在特定实例甚至实例属性上调用expire。

test_session.expire(lamp)

在这些调用之后,你将不得不手动从数据库中读取对象。

以下是使用过期时代码中的步骤序列。

使用过期时的代码中的步骤

使用过期时的代码中的步骤 以下是带有过期的测试代码。

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   test_session.expire_all()   # OR:   # test_session.expire(lamp)   results = await test_session.execute(select(Lamp).where(Lamp.id == 1))   new_lamp = results.scalar_one_or_none()   logging.error("Updated status: %s", new_lamp.status)   assert new_lamp.status == "on"

关闭 

实际上,使用会话终止的最后一种方法也调用了expire_all,但会话可以进一步使用。当读取新数据时,我们将获得最新的对象。

await test_session.close()

这应该在应用程序请求完成之后并在检查开始之前立即调用。

以下是使用关闭时代码中的步骤。

使用关闭时的代码中的步骤

以下是带有会话关闭的测试代码。

@pytest.mark.asyncioasync def test_lamp_on(test_session):   """ Test lamp switch on """   lamp = Lamp()   test_session.add(lamp)   await test_session.commit()   await test_session.refresh(lamp)   logging.error("New client status: %s", lamp.status)   assert lamp.status == "off"   async with AsyncClient(app=app, base_url="http://testserver") as async_client:       response = await async_client.post(f"/lamps/{lamp.id}/on")       assert response.status_code == 200   await test_session.close()   results = await test_session.execute(select(Lamp).where(Lamp.id == 1))   new_lamp = results.scalar_one_or_none()   logging.error("Updated status: %s", new_lamp.status)   assert new_lamp.status == "on"

调用 rollback() 也会有帮助。它也调用 expire_all,但它明确地回滚了事务。如果需要执行事务,commit() 也会执行 expire_all。但在这个例子中,既不需要回滚也不需要提交,因为测试中的事务已经完成,应用程序中的事务不会影响来自测试的会话。

实际上,此功能仅在SQLAlchemy ORM的异步模式中在事务中工作。然而,在代码中我确实向数据库发出请求以获取新对象的行为看起来似乎不合逻辑,如果它仍然返回一个缓存的对象,而不是从数据库强制接收的对象。当调试代码时,这有点令人困惑。但当正确使用时,这就是它应有的样子。

结论 

在异步模式下使用SQLAlchemy ORM,您必须并行跟踪事务和线程中的会话。如果所有这些看起来太复杂,那么使用SQLAlchemy ORM的同步模式。它里面的一切都简单得多。

作者:Aleksei Sharypov

更多内容请关注公号【云原生数据库】

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/140337.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式应用之监控平台zabbix的认识与搭建

一、监控系统的相关知识 1.1 监控系统运用的原因 当我们需要实时关注与其相关的各项指标是否正常,往往存在着很多的服务器、网络设备等硬件资源,如果我们想要能够更加方便的、集中的监控他们,zabix可以实现集中监控管理的应用程序 监控的…

微服务保护

1.初识Sentinel 1.1.雪崩问题及解决方案 1.雪崩问题 微服务中,服务间调用关系错综复杂,一个微服务往往依赖于多个其它微服务。 如图,如果服务提供者I发生了故障,当前的应用的部分业务因为依赖于服务I,因此也会被阻塞…

Maven高级---分模块设计,继承(继承关系/版本锁定/自定义属性)

目录 分模块设计 继承与聚合 继承关系 ​案例​ 版本锁定 自定义属性/引用属性 分模块设计 把一个项目拆分成不同的模块 我们可以把原来一个项目包中的东西单独提出来作为一个模块,也是解耦的思想 然后我们可以通过引入依赖的方式将这两个模块引入,如下 继承与聚合 继…

基于springboot高校场馆预订系统

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

Linux查看系统信息

# 查看操作系统的详细信息 uname -a# 查看已安装的Linux发行版信息 cat /etc/os-release# 查看Linux Standard Base (LSB)的信息 lsb_release -a# 查看主机的信息 hostnamectl# 查看文件系统的磁盘空间使用情况 df -h# 查看系统内存的使用情况 free -h# 查看网络接口的信息 ifc…

Android内存优化内存抖动的解决实战

问题背景 假设我们有一个应用,它的功能是在一个TextView上显示一个计数器,每隔一秒钟就更新一次计数器的值。为了实现这个功能,我们使用了一个Handler来发送空消息,并在接收到消息时更新计数器的值,并再次发送空消息&…

利用爬虫技术自动化采集汽车之家的车型参数数据

导语 汽车之家是一个专业的汽车网站,提供了丰富的汽车信息,包括车型参数、图片、视频、评测、报价等。如果我们想要获取这些信息,我们可以通过浏览器手动访问网站,或者利用爬虫技术自动化采集数据。本文将介绍如何使用Python编写…

Android 实现椭圆形中心发散渐变绘制

1. 前言 工作中有一个需求,需要实现一个椭圆形的中心发散渐变效果,并且可以自由指定椭圆比例,旋转角度等。 Android中实现椭圆形的绘制很简单,只需要调用canvas.drawOval()就可以了,圆形的中心发散渐变可以使用RadialGradient,但是椭圆形的中心发散渐变效果Android自带的…

浅谈Deep Learning 与 Machine Learning 与Artificial Intelligence

文章目录 三者的联系与区别 三者的联系与区别 “Deep Learning is a kind of Machine Learning, and Machine Learning is a kind of Artificial Intelligence.” 人工智能(AI),机器学习(Machine Learning,简称ML&am…

Linux环境变量配置说明(配置jdk为例-摘录自尚硅谷技术文档)

配置环境变量的不同方法 Linux的环境变量可在多个文件中配置,如/etc/profile,/etc/profile.d/.sh,~/.bashrc,~/.bash_profile等,下面说明上述几个文件之间的关系和区别。 bash的运行模式可分为login shell和non-login shell。 例…

Unity3D C# 反射与特性的配合使用

需求分析 情况: 假如我们是一个动物园的管理员,我们需要统计园内的所有动物和动物的行为。 举例: 现在园区内有猫、狗和鸡。猫对应的行为是喵喵喵和卖萌,狗对应狗吠和干饭,鸡对应篮球和打鸣那么这时候我要统计这些&a…

R语言风险价值:ARIMA,GARCH,Delta-normal法滚动估计VaR(Value at Risk)和回测分析股票数据...

全文链接:http://tecdat.cn/?p24492 此分析的目的是构建一个过程,以在给定时变波动性的情况下正确估计风险价值。风险价值被广泛用于衡量金融机构的市场风险。我们的时间序列数据包括 1258 天的股票收益(点击文末“阅读原文”获取完整代码数…

Linux chmod命令——修改权限信息

我们可以使用chmod命令,修改文件、文件夹的权限信息。注意,只有文件、文件夹的所属用户或root用户可以修改。 chmod [-R] 权限 文件或文件夹 -R,对文件夹内的全部内容应用同样的操作 例如: chmod urwx,grx,ox hello.txt &…

基于微信小程序的项目申报管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言用户微信端的主要功能有:管理员的主要功能有:具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding)有保障的售后福利 代码参考源码获取 前言 💗博主介绍:✌全网粉…

flarum 论坛 User Statistics插件修改

此插件在中国使用日期不是很理想,于是决定修改代码 下面是插件信息: User Statistics A Flarum extension. Add some user statistics in flarum posts, this extension require clarkwinkelmann/flarum-ext-likes-received and will be installed au…

【postgresql】替换 mysql 中的ifnull()

数据库由mysql 迁移到postgresql,程序在执行查询时候报错。 HINT: No function matches the given name and argument types. You might need to add explicit type casts. CONTEXT: referenced column: ifnull 具体SQL: SELECT ifnull(phone,) FROM c_user p…

Rust中的结构体

专栏简介:本专栏作为Rust语言的入门级的文章,目的是为了分享关于Rust语言的编程技巧和知识。对于Rust语言,虽然历史没有C、和python历史悠远,但是它的优点可以说是非常的多,既继承了C运行速度,还拥有了Java…

钾和钠含量

声明 本文是学习GB-T 397-2022 商品煤质量 炼焦用煤. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件规定了炼焦用商品煤产品质量等级和技术要求、试验方法、检验规则、标识、运输及贮存。 本文件适用于生产、加工、储运、销售、使用…

MySQL索引事务

一 、 索引 索引是一种特殊的文件,包含着对数据表里所有记录的引用指针。 可以对表中的一列或多列创建索引, 并指定索引的类型,各类索引有各自的数据结构实现。 索引保存的数据结构主要为B树,及hash的方式。 1. 作用 数据库中…

网络安全攻防对抗之隐藏通信隧道技术整理

完成内网信息收集工作后,渗透测试人员需要判断流量是否出得去、进得来。隐藏通信隧道技术常用于在访问受限的网络环境中追踪数据流向和在非受信任的网络中实现安全的数据传输。 一、隐藏通信隧道基础知识 (一)隐藏通信隧道概述 一般的网络通…