SQLAlchemy 介绍与实践

postgresql 实践
pydantic 实践

1. SQLAlchemy 介绍

SQLAlchemy 是一个 ORM 框架。SQLAlchemy 是一个用于 Python 的 SQL 工具和对象关系映射(ORM)库。它允许你通过 Python 代码来与关系型数据库交互,而不必直接编写SQL语句。
简单介绍一下对象关系映射吧,对象关系映射(英语:Object Relational Mapping,简称 ORM,或O/RM,或O/R mapping),是一种程序设计技术, 用于实现面向对象编程语言里不同类型系统的数据之间的转换。 从效果上说,它其实是创建了一个可在编程语言里使用的“虚拟对象数据库”。大白话:对象模型与数据库表的映射。

1.1. SQLAlchemy 使用场景

SQLAlchemy 是一个强大的 Python ORM 框架,主要应用于以下场景:
数据库访问和操作: SQLAlchemy 提供了高层抽象来操作数据库,可以避免写原生 SQL 语句。支持多种数据库后端(MySQL、MongoDB、SQLite、PostgreSQL)。
ORM映射: 建立 Python 类与数据库表的映射关系,简化数据模型的操作,支持声明式操作。
复杂查询: SQLAlchemy 提供丰富的查询方式,如过滤、分组、联结等,可以构建复杂查询。
异步查询: 基于Greenlet 等实现异步查询,提高查询效率。
事务控制: 通过 Session 管理数据库会话和事务。
工具集成: 如数据迁移工具 Alembic,可以实现 Schema 版本控制和迁移。
大数据集查询: 基于 Pagination 实现数据分页,避免大量数据查询内存溢出。
多数据库支持: 支持 Postgres、MySQL、Oracle 等主流数据库。
Web框架集成: 框架如 Flask 可以集成 SQLAlchemy,便于 Web 应用开发。

2. SQLAlchemy 基本用法

参考:
https://www.jb51.net/python/325524ud6.htm
https://blog.51cto.com/u_13019/12307379

2.1. 安装 SQLAlchemy

在使用 SQLAlchemy 之前,首先需要安装它。可以使用以下命令使用 pip 安装:

pip install sqlalchemy 
pip install pymysql		# 安装 MySQL

2.2. 连接数据库

使用 SQLAlchemy 连接到数据库,需要提供数据库的连接字符串,其中包含有关数据库类型、用户名、密码、主机和数据库名称的信息。

from sqlalchemy import create_engine# 例如,连接到 SQLite 数据库
engine = create_engine('sqlite:///example.db')# 例如,连接到 MySQL 数据库
username = 'your_mysql_username'
password = 'your_mysql_password'
host = 'your_mysql_host'  # 例如:'localhost' 或 '127.0.0.1'
port = 'your_mysql_port'  # 通常是 3306
database = 'your_database_name'
# 创建连接引擎
engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{database}')

2.3. 定义数据模型

使用 SQLAlchemy 的 ORM 功能,可以定义 Python 类来映射数据库中的表。每个类对应数据库中的一张表,类的属性对应表中的列。

# 导入必要的模块
from sqlalchemy import Column, Integer, String, Sequence
from sqlalchemy.ext.declarative import declarative_base# 创建一个基类,用于定义数据模型的基本结构
Base = declarative_base()# 定义一个数据模型类,对应数据库中的 'users' 表
class User(Base):# 定义表名__tablename__ = 'users'# 定义列:id,是整数类型,主键(primary_key=True),并使用 Sequence 生成唯一标识id = Column(Integer, Sequence('user_id_seq'), primary_key=True)# 定义列:name,是字符串类型,最大长度为50name = Column(String(50))# 定义列:age,是整数类型age = Column(Integer)

2.4. 创建表

通过在代码中调用 create_all 方法,可以根据定义的模型创建数据库表。

Base.metadata.create_all(engine)

2.5. 插入数据

使用 SQLAlchemy 进行插入数据的操作,首先需要创建一个会话(Session)对象,然后使用该对象添加数据并提交。

# 导入创建会话的模块
from sqlalchemy.orm import sessionmaker# 使用 sessionmaker 创建一个会话类 Session,并绑定到数据库引擎(bind=engine)
Session = sessionmaker(bind=engine)# 创建一个实例化的会话对象 session
session = Session()# 创建一个新的 User 实例,即要插入到数据库中的新用户
new_user = User(name='John Doe', age=30)# 将新用户添加到会话中,即将其添加到数据库操作队列中
session.add(new_user)# 提交会话,将所有在此会话中的数据库操作提交到数据库
session.commit()

2.6. 使用事务添加数据

如果添加过程中发生任何错误,我们将回滚事务,确保数据库的一致性。

try:# 开始一个新的事务session.begin()# 创建新用户对象user1 = User(name='Alice', email='alice@example.com')user2 = User(name='Bob', email='bob@example.com')# 添加到会话中session.add(user1)session.add(user2)# 提交事务,将所有更改保存到数据库session.commit()print("Users added successfully.")
except Exception as e:# 如果在添加用户过程中发生错误,则回滚事务session.rollback()print(f"An error occurred: {e}")
finally:# 关闭会话session.close()

在这个示例中,我们使用 session.begin() 显式地开始了一个新的事务。然后,我们尝试添加两个新用户到会话中。如果在这个过程中没有发生任何错误,我们使用 session.commit() 提交事务,将所有更改保存到数据库中。但是,如果在添加用户的过程中发生了任何异常(例如,由于重复的电子邮件地址或数据库连接问题),我们将使用 session.rollback() 回滚事务,确保数据库的一致性。

2.7. 查询数据

使用 SQLAlchemy 进行查询数据的操作,可以通过查询语句或使用 ORM 查询接口。

# 使用查询语句
result = engine.execute('SELECT * FROM users')# 使用 ORM 查询接口
users = session.query(User).all()

3. 复杂查询条件

3.1. 连接查询(Join)

假设我们有两个模型, User 和 Order ,并且一个用户可以有多个订单。

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String)orders = relationship("Order", back_populates="user")class Order(Base):__tablename__ = 'orders'id = Column(Integer, primary_key=True)user_id = Column(Integer, ForeignKey('users.id'))product = Column(String)quantity = Column(Integer)user = relationship("User", back_populates="orders")

现在,如果我们想要查询所有下过订单的用户及其订单信息,我们可以进行连接查询:

from sqlalchemy.orm import joinedload
# 加载所有用户的订单信息
users_with_orders = session.query(User).options(joinedload(User.orders)).all()
for user in users_with_orders:print(f"User: {user.name}")for order in user.orders:print(f"  Order: {order.product}, Quantity: {order.quantity}")

3.2. 分组和聚合(Grouping and Aggregation)

假设我们想要统计每个用户下的订单总数。

from sqlalchemy import func
# 按用户分组,并计算每个用户的订单数量
order_count_by_user = session.query(User.id, User.name, func.count(Order.id).label('order_count')).\join(Order).group_by(User.id, User.name).all()
for user_id, user_name, order_count in order_count_by_user:print(f"User ID: {user_id}, Name: {user_name}, Order Count: {order_count}")

3.3. 子查询(Subquery)

如果我们想要找出订单数量超过平均订单数量的用户,我们可以使用子查询。

from sqlalchemy import func, select
# 计算平均订单数量作为子查询
avg_order_quantity = select([func.avg(Order.quantity).label('avg_quantity')]).select_from(Order).alias()
# 查询订单数量超过平均值的用户及其订单信息
users_above_avg = session.query(User, Order.product, Order.quantity).\join(Order).filter(Order.quantity > avg_order_quantity.c.avg_quantity).all()
for user, product, quantity in users_above_avg:print(f"User: {user.name}, Product: {product}, Quantity: {quantity}")

3.4. 复杂筛选条件(Complex Filtering)

假设我们想要找到名字以 “A” 开头的用户,并且他们的订单中包含 “apple” 这个产品。

# 查询名字以“A”开头的用户,且订单中包含“apple”产品的用户信息
users_with_apple = session.query(User).join(Order).\filter(User.name.startswith('A')).\filter(Order.product.contains('apple')).\distinct().all()  # 使用distinct() 确保结果中的用户不重复
for user in users_with_apple:print(f"User: {user.name}")

4. SQLAlchemy 实践1

4.1. 定义数据模型类

我们定义三个数据模型类:User(用户)、Post(文章)和Comment(评论)。这些类之间通过外键和关系进行关联。

# 导入 SQLAlchemy 中所需的模块
from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship# 创建一个基类,用于定义数据模型的基本结构
Base = declarative_base()# 定义数据模型类 User,对应数据库中的 'users' 表
class User(Base):__tablename__ = 'users'# 定义列:id,是整数类型,作为主键id = Column(Integer, primary_key=True)# 定义列:username,是字符串类型,最大长度为50,唯一且不可为空username = Column(String(50), unique=True, nullable=False)# 定义列:email,是字符串类型,最大长度为100,唯一且不可为空email = Column(String(100), unique=True, nullable=False)# 定义关系,与 Post 类的关系为一对多关系,通过 back_populates 指定反向关系属性名posts = relationship('Post', back_populates='author')# 定义数据模型类 Post,对应数据库中的 'posts' 表
class Post(Base):__tablename__ = 'posts'# 定义列:id,是整数类型,作为主键id = Column(Integer, primary_key=True)# 定义列:title,是字符串类型,最大长度为100,不可为空title = Column(String(100), nullable=False)# 定义列:content,是文本类型,不可为空content = Column(Text, nullable=False)# 定义列:user_id,是整数类型,外键关联到 'users' 表的 id 列user_id = Column(Integer, ForeignKey('users.id'))# 定义关系,与 User 类的关系为多对一关系,通过 back_populates 指定反向关系属性名author = relationship('User', back_populates='posts')# 定义关系,与 Comment 类的关系为一对多关系,通过 back_populates 指定反向关系属性名comments = relationship('Comment', back_populates='post')# 定义数据模型类 Comment,对应数据库中的 'comments' 表
class Comment(Base):__tablename__ = 'comments'# 定义列:id,是整数类型,作为主键id = Column(Integer, primary_key=True)# 定义列:text,是文本类型,不可为空text = Column(Text, nullable=False)# 定义列:user_id,是整数类型,外键关联到 'users' 表的 id 列user_id = Column(Integer, ForeignKey('users.id'))# 定义列:post_id,是整数类型,外键关联到 'posts' 表的 id 列post_id = Column(Integer, ForeignKey('posts.id'))# 定义关系,与 User 类的关系为多对一关系author = relationship('User')# 定义关系,与 Post 类的关系为多对一关系,通过 back_populates 指定反向关系属性名post = relationship('Post', back_populates='comments')

4.2. 创建数据库引擎和会话

这里我们选择了 SQLite 数据库,并使用 create_all 创建相应的表。

# 导入 SQLAlchemy 中所需的模块
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker# 创建一个 SQLite 数据库引擎,连接到名为 'blog.db' 的数据库文件
engine = create_engine('sqlite:///blog.db')# 使用 Base 对象的 metadata 属性,创建数据库中定义的所有表
Base.metadata.create_all(engine)# 使用 sessionmaker 创建一个会话类 Session,并将其绑定到上面创建的数据库引擎
Session = sessionmaker(bind=engine)# 创建一个实例化的会话对象 session,用于进行数据库操作
session = Session()

4.3. 进行数据库操作

这段代码演示了如何使用 SQLAlchemy 对数据库进行插入和查询操作。首先,创建了一个用户、一篇文章和一条评论,然后通过查询用户的方式,打印出该用户的所有文章及评论。

# 创建一个新用户对象并设置其属性
user1 = User(username='john_doe', email='john@example.com')# 将新用户对象添加到会话,表示要进行数据库插入操作
session.add(user1)# 提交会话,将所有在此会话中的数据库操作提交到数据库
session.commit()# 创建一篇新文章对象并设置其属性
post1 = Post(title='Introduction to SQLAlchemy', content='SQLAlchemy is a powerful ORM for Python.')# 将文章的作者关联到之前创建的用户
post1.author = user1# 将新文章对象添加到会话,表示要进行数据库插入操作
session.add(post1)# 提交会话,将所有在此会话中的数据库操作提交到数据库
session.commit()# 创建一条新评论对象并设置其属性
comment1 = Comment(text='Great article!', author=user1, post=post1)# 将评论对象添加到会话,表示要进行数据库插入操作
session.add(comment1)# 提交会话,将所有在此会话中的数据库操作提交到数据库
session.commit()# 查询用户名为 'john_doe' 的用户,并打印其所有文章及评论
user = session.query(User).filter_by(username='john_doe').first()
print(f"User: {user.username}")# 遍历用户的所有文章
for post in user.posts:print(f"Post: {post.title}")# 遍历文章的所有评论for comment in post.comments:print(f"Comment: {comment.text}")

5. SQLAlchemy 实践2 数据库初始化操作

main.py

from sqlalchemy import create_engine
from sqlalchemy_utils import create_database, database_existsfrom sqlalchemy import and_, or_, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import SingletonThreadPoolfrom database import Base
from database import TUsersdef is_db_exist(db_url: str):engine = create_engine(db_url, max_overflow=0, pool_size=16, pool_timeout=5, pool_recycle=-1)if not database_exists(engine.url):create_database(engine.url)return Falseelse:    return Truedef init_database(db_url: str):# SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:password@localhost:3306/location_db"# # SQLALCHEMY_DATABASE_URL = "sqlite:///./location_db.db"# 设置连接池的大小engine = create_engine(db_url, max_overflow=0,         # 超过连接池大小外最多创建的连接pool_size=16,           # 连接池大小pool_timeout=5,         # 池中没有线程最多等待的时间,否则报错pool_recycle=-1         # 多久之后对线程池中的线程进行一次连接的回收(重置))# 创建数据库Base.metadata.create_all(engine)# # 创建数据库连接# _session = sessionmaker(autocommit=False, autoflush=False, bind=engine)()if __name__ == '__main__':# 数据库参考db_host = "127.0.0.1"db_user = "root"db_password = "********"db_url = '%s%s:%s@%s/%s' % ("mysql+pymysql://", db_user, db_password, db_host, "local_db")if not is_db_exist(db_url):init_database(db_url)from account import AlchemyToolalchemytool = AlchemyTool(db_url=db_url)alchemytool.create_user()

database.py

from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime, timedelta# sqlalchemy
Base = declarative_base()class TUsers(Base):__tablename__ = 'users'user_id     = Column(String(32), primary_key=True, index=True)user_name   = Column(String(32), nullable=False)def to_dict(self):return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}

account.py

from sqlalchemy import and_, or_, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import SingletonThreadPoolfrom database import Base
from database import TUsersclass AlchemyTool(object):def __init__(self, db_url: str):print("AlchemyTool init")# 设置连接池的大小db_config = {"pool_size": 10}#engine = create_engine(SQLALCHEMY_DATABASE_URL, **db_config)# engine = create_engine(db_url, #         #poolclass = SingletonThreadPool,#         connect_args = {'check_same_thread': False}#     )engine = create_engine(db_url, max_overflow=0, pool_size=16, pool_timeout=5, pool_recycle=-1)# # 创建数据库# Base.metadata.create_all(engine)# 创建数据库连接self.__session = sessionmaker(autocommit=False, autoflush=False, bind=engine)()# ############################### User ###############################def create_user(self):try:# db_location = TUsers(**user_base.dict(), user_id=user_id, creator=creator, editor=editor, state=state)db_location = TUsers(user_id="id2222", user_name="name")self.__session.add(db_location)self.__session.commit()except SQLAlchemyError as e:print(f"Error.AlchemyTool.create_user SQLAlchemyError:{str(e)}")self.__session.rollback()db_location = Nonefinally:pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/465018.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装MongoDB详解(mongo.latest)

一、MongoDB介绍 MongoDB是一种基于分布式文件存储的数据库,使用C语言开发,旨在为Web应用提供可扩展且高性能的数据存储解决方案。作为一种介于关系数据库和非关系数据库之间的技术,MongoDB具有强大的功能和高效的性能,特别适用于…

金箍棒变化-第15届蓝桥杯国赛Scratch初/中级组真题第1题

[导读]:超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成,后续会不定期解读蓝桥杯真题,这是Scratch蓝桥杯真题解析第193讲。 如果想持续关注Scratch蓝桥真题解读,可以点击《Scratch蓝桥杯历年真题》并订阅合集,…

简单的 docker 部署ELK

简单的 docker 部署ELK 这是我的运维同事部署ELK的文档,我这里记录转载一下 服务规划 架构: Filebeat->kafka->logstash->ES kafka集群部署参照: kafka集群部署 部署服务程序路径/数据目录端口配置文件elasticsearch/data/elasticsearch9200/data/elas…

Unity XR Interaction Toolkit 开发教程(3)快速配置交互:移动、抓取、UI交互【3.0以上版本】

获取完整课程以及答疑,工程文件下载: https://www.spatialxr.tech/ 视频试看链接: 3.快速配置交互:移动、抓取、UI交互【Unity XR Interaction Toolkit 跨平台开发教程】(3.0以上版本) 系列教程专栏&…

深度体验SCNet超算平台:SCNet「AI跃升季」·谁是下一个“AI”跃人?

平时做大模型训练的时候总是苦于没有服务器资源来做微调实验,于是这次深度体验了一下SCNet超算平台。 SCNet超算平台是一个超算互联网计算服务平台,有着更大更全更专业的超级算力。显卡从异构加速卡到A800都有。 本次我尝试了大模型的推理和微调。 第一…

求助帖【如何学习核磁共振的原理】

最近提前进组了 我完全不懂磁共振的相关知识 想问问各位大佬有没有推荐的学习路线 或者是学习资料、论坛都可以的(我做的方向是磁共振成像技术) 老师给了一本书,但是有点看不懂,全英文的 叫Principles Of Magnetic Resonance …

MySQL查询where中包含多个in条件问题

示例: select * from x_table where a in (1,2,3) and b in (4,8) 上面这种查询方法,如果可以通过a和b唯一确定一条数据,但a和b列可以有相同值时,会造成查询数据不准确。 验证: 假设有以下数据(手机号为…

HiveSQL 中判断字段是否包含某个值的方法

HiveSQL 中判断字段是否包含某个值的方法 在 HiveSQL 中,有时我们需要判断一个字段是否包含某个特定的值。下面将介绍几种常用的方法来实现这个功能。 一、创建示例表并插入数据 首先,我们创建一个名为employee的表,并插入一些示例数据&am…

python-读写Excel:openpyxl-(4)下拉选项设置

使用openpyxl库的DataValidation对象方法可添加下拉选择列表。 DataValidation参数说明: type: 数据类型("whole", "decimal", "list", "date", "time", "textLength", "custom"…

求平面连接线段组成的所有最小闭合区间

这个功能确实非常实用,我在过去开发地面分区编辑器时就曾应用过这一算法。最近,在新产品的开发中再次遇到了类似的需求。尽管之前已经实现过,但由于长时间未接触,对算法的具体细节有所遗忘,导致重新编写时耗费了不少时…

springboot - 定时任务

定时任务是企业级应用中的常见操作 定时任务是企业级开发中必不可少的组成部分,诸如长周期业务数据的计算,例如年度报表,诸如系统脏数据的处理,再比如系统性能监控报告,还有抢购类活动的商品上架,这些都离不…

ES管理工具Cerebro 0.8.5 Windows版本安装及启动

前言: Cerebro 的下载地址 https://github.com/lmenezes/cerebro/releases Cerebro 默认监听IP 0.0.0.0 ,默认端口9000,访问地址:http://localhost:9000 启动 cmd命令到安装目录下:cerebro-0.8.5\bin 执行命令 ce…

Flutter 正在切换成 Monorepo 和支持 workspaces

其实关于 Monorepo 和 workspaces 相关内容在之前《Dart 3.5 发布,全新 Dart Roadmap Update》 和 《Flutter 之 ftcon24usa 大会,创始人分享 Flutter 十年发展史》 就有简单提到过,而目前来说刚好看到 flaux 这个新进展,所以就再…

[论文][环境]3DGS+Colmap环境搭建_WSL2_Ubuntu22.04 - 副本

0. 前言 仅使用Ubuntu进行场景编译,场景渲染查看则使用Windows下官方提供的编译好的预编译包打开即可,非常方便(要注意即使是预编译版本,Windows端也应该安装VS和CUDA Toolkit,要注意的是,最新的SIBR预编译…

json-server的使用(根据json数据一键生成接口)

一.使用目的 在前端开发初期,后端 API 可能还未完成,json-server 可以快速创建模拟的 RESTful API,帮助前端开发者进行开发和测试。 二.安装 npm install json-server //局部安装npm i json-server -g //全局安装 三.使用教程 1.准备一…

导入和部署自定义 LLM 大模型

本文以【Qwen2-7B-Instruct】模型为例,指导如何将自定义大模型导入到 TI 平台,并使用平台内置推理镜像部署大模型对话推理服务。 前置要求 申请 CFS 本文所涉及到的操作需要通过 CFS 存储模型文件,详情请查看创建文件系统及挂载点。 操作…

开源办公软件 ONLYOFFICE 深入探索

文章目录 引言1. ONLYOFFICE 创建的背景1. 1 ONLYOFFICE 项目启动1. 2 ONLYOFFICE 的发展历程 2. 核心功能介绍2. 1 桌面编辑器2. 1. 1 文档2. 1. 2 表格2. 1. 3 幻灯片 2. 2 协作空间2. 3 文档编辑器 - 本地部署版 3. 技术介绍4. 安装5. 优势与挑战6. 个人体验7. 强大但不止于…

HTTP慢速攻击原理及解决办法

目录 引言 HTTP慢速攻击原理 解决办法 Nginx Tomcat 华宇TAS IIS 结论 引言 HTTP慢速攻击(Slow HTTP Attack)是一种拒绝服务攻击(DoS),攻击者通过故意缓慢地发送HTTP请求来耗尽服务器资源,导致合法…

[mysql]修改表和课后练习

目录 DDL数据定义语言 添加一个字段 添加一个字段到最后一个 添加到表中的第一个一个字段 选择其中一个位置: 修改一个字段:数据类型,长度,默认值(略) 重命名一个字段 删除一个字段 重命名表 删除表 清空表 DCL中事务相关内容 DCL中COMMIT和ROLLBACK的讲解 对比TR…

SpringBoot+ClickHouse集成

前面已经完成ClickHouse的搭建&#xff0c;创建账号&#xff0c;创建数据库&#xff0c;保存数据库等&#xff0c;接下来就是在SpringBoot项目中集成ClickHouse。 一&#xff0c;引入依赖 <!-- SpringBoot集成ClickHouse --> <dependency><groupId>com.baom…