SQLAlchemy 使用封装实例

类封装

database.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import sys
import json
import logging
from datetime import datetimefrom core.utils import classlock, parse_bool
from core.config import (MYSQL_HOST,MYSQL_PORT,MYSQL_USER,MYSQL_PASS,MYSQL_DATABASE,MYSQL_TIMEOUT
)from sqlalchemy import create_engine, Column, desc, not_, func
from sqlalchemy import Integer, String, Boolean, DateTime, Text, Enum     # Text存储大不固定长的字符串
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyErrorBase = declarative_base()log = logging.getLogger("log")SCHEMA_VERSION = "1.0.0"class User(Base):__tablename__ = "user"id = Column(Integer(), primary_key=True)file_size = Column(Integer(), nullable=False)      # nullable 不可为空md5 = Column(String(32), nullable=False)crc32 = Column(String(8), nullable=False)sha1 = Column(String(40), nullable=False)sha256 = Column(String(64), nullable=False)sha512 = Column(String(128), nullable=False)memory = Column(Boolean, nullable=False, default=False)ssdeep = Column(String(255), nullable=True)start_time = Column(DateTime(timezone=False), nullable=True, default=datetime.now)def __repr__(self):   # 查询返回的结果return "<User('{0}','{1}')>".format(self.id, self.sha256)def to_dict(self):"""将对象转换为dict.@return: dict"""d = {}for column in self.__table__.columns:d[column.name] = getattr(self, column.name)return ddef to_json(self):"""将对象转换为JSON.@return: JSON data"""return json.dumps(self.to_dict())class Version(Base):"""用于确定实际数据库架构发布的表."""__tablename__ = "version"version_num = Column(String(32), nullable=False, primary_key=True)class Database(object):"""分析队列数据库此类处理为内部队列创建数据库用户经营它还提供了一些与之交互的功能"""def __init__(self, schema_check=True, echo=False):"""@param dsn: 数据库连接字符串.@param schema_check: 禁用或启用数据库架构版本检查.@param echo: echo sql 查询."""self._lock = Noneself.schema_check = schema_checkself.echo = echodef connect(self, schema_check=None, dsn=None, create=True):"""连接到数据库后端."""if schema_check is not None:self.schema_check = schema_checkif not dsn:dsn = "mysql://{0}:{1}@{2}:{3}/{4}".format(MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE)#dsn = "mysql://{username}:{password}@{hostname}:{port}/{database}"self._connect_database(dsn)# 禁用SQL日志记录。打开它进行调试.self.engine.echo = self.echo# 连接超时.self.engine.pool_timeout = MYSQL_TIMEOUT# 获取数据库会话.self.Session = sessionmaker(bind=self.engine)if create:self._create_tables()def _create_tables(self):"""创建所有数据库表等."""try:Base.metadata.create_all(self.engine)except SQLAlchemyError as e:raise ("无法创建或连接到数据库: %s" % e)# 处理架构版本控制.# TODO: it's a little bit dirty, needs refactoring.tmp_session = self.Session()if not tmp_session.query(Version).count():# 设置数据库架构版本.tmp_session.add(Version(version_num=SCHEMA_VERSION))try:tmp_session.commit()except SQLAlchemyError as e:raise ("无法设置架构版本: %s" % e)tmp_session.rollback()finally:tmp_session.close()else:# 检查数据库版本是否为预期版本.last = tmp_session.query(Version).first()tmp_session.close()if last.version_num != SCHEMA_VERSION and self.schema_check:log.warning("数据库架构版本不匹配:找到 %s,应为 %s.",last.version_num, SCHEMA_VERSION)log.error("(可选)进行备份,然后通过运行migrate应用最新的数据库迁移。")sys.exit(1)def __del__(self):"""断开连接池."""self.engine.dispose()def _connect_database(self, connection_string):"""连接到数据库.@param connection_string: 指定数据库的连接字符串"""try:if connection_string.startswith("sqlite"):# 使用“check_same_thread”在多个线程上禁用sqlite安全检查.self.engine = create_engine(connection_string, connect_args={"check_same_thread": False})elif connection_string.startswith("postgres"):# 禁用SSL模式以避免使用sqlalchemy和多进程时出现一些错误.# See: http://www.postgresql.org/docs/9.0/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS# TODO 检查这是否仍然相关。特别是假设我们不再使用多处理.self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"})else:self.engine = create_engine(connection_string)except ImportError as e:lib = str(e).split()[-1].strip("'")if lib == "MySQLdb":log.error("缺少MySQL数据库驱动程序(在Linux上使用 `pip install mysql-python` 安装,或在Windows上使用 `pip-install mysqlclient`)")if lib == "psycopg2":log.error("缺少PostgreSQL数据库驱动程序 (使用 `pip install psycopg2`)")log.error("缺少未知的数据库驱动程序,无法导入 %s" % lib)sys.exit(-1)@classlockdef add_user(self, file_size, md5, crc32, sha1, sha256, sha512, memory, ssdeep=None):session = self.Session()# 将空字符串和None值转换为有效的int# if not timeout:#     timeout = 0# if not priority:#     priority = 1#try:memory = parse_bool(memory)except ValueError:memory = False## try:#     enforce_timeout = parse_bool(enforce_timeout)# except ValueError:#     enforce_timeout = Falseuser = User()user.file_size = file_sizeuser.md5 = md5user.crc32 = crc32user.sha1 = sha1user.sha256 = sha256user.sha512 = sha512user.memory = memoryuser.ssdeep = ssdeepsession.add(user)try:session.commit()except SQLAlchemyError as e:log.error("数据库添加 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef select_user(self, id=None):session = self.Session()try:search = session.query(User)if id:search = search.filter_by(id=id)# 排序# search = search.order_by(id)# search = search.order_by(desc(User.id))  倒叙tasks = search.all()return tasksexcept SQLAlchemyError as e:log.error("数据库查看所有 user 错误: {0}".format(e))return []finally:session.close()@classlockdef update_user(self, id, new_file_size):session = self.Session()try:search = session.query(User).filter(User.id == id).first()search.file_size = new_file_size# session.query(User).filter(User.id == 1).update({'file_size': 'new_file_size'})session.commit()except SQLAlchemyError as e:log.error("数据库更新 user 错误: {0}".format(e))return Falsefinally:session.close()return True@classlockdef delete_user(self, id):session = self.Session()try:search = session.query(User).filter(User.id == id).first()if search:session.delete(search)session.commit()except SQLAlchemyError as e:log.error("数据库删除 user 错误: {0}".format(e))return Falsefinally:session.close()return True

调用运行

merage.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import time
import loggingfrom core.database import Databaselog = logging.getLogger("log")class Merge(object):def __init__(self):db = Database()db.connect()res = db.add_user(32, "11", "22", "33", "44", "55", "off")if not res:print("添加错误")print(db.select_user())# res = db.update_user(1, 50)# if not res:#     print("更新错误")# res = db.delete_user(2)# if not res:#     print("删除错误")if __name__ == '__main__':merge = Merge()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/155826.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CustomShapes/自定义形状, CustomCurves/自定义曲线, AnimateableData/数据变化动画 的使用

1. CustomShapes 自定义形状视图 1.1 资源图文件 therock.png 1.2 创建自定义形状视图 CustomShapesBootcamp.swift import SwiftUI/// 三角形 struct Triangle: Shape{func path(in rect: CGRect) -> Path {Path { path inpath.move(to: CGPoint(x: rect.midX, y: rect.mi…

Win10玩游戏老是弹回桌面的解决方法

在Win10电脑中&#xff0c;用户不仅可以办公&#xff0c;也可以畅玩各种各样的游戏。但是&#xff0c;有时候用户在玩游戏的时候&#xff0c;遇到了游戏老是自己弹回桌面的问题&#xff0c;这样是非常影响游戏体验的&#xff0c;却不清楚具体的解决方法。下面小编给大家带来了简…

二叉搜索树-- 删除节点-力扣 450 题

例题细节已经讲过&#xff08;二叉搜索树的基础操作-CSDN博客&#xff09;&#xff0c;用非递归和递归均可实现&#xff0c;这里只给出递归参考代码&#xff1a; public TreeNode deleteNode(TreeNode node, int key) {if (node null) {return null;}if (key < node.val) …

算法进阶——字符串的排列

题目 输入一个长度为 n 字符串&#xff0c;打印出该字符串中字符的所有排列&#xff0c;你可以以任意顺序返回这个字符串数组。 例如输入字符串ABC,则输出由字符A,B,C所能排列出来的所有字符串ABC,ACB,BAC,BCA,CBA和CAB。 数据范围&#xff1a;n<10 要求&#xff1a;空间复…

【JavaEE初阶】 死锁详解

文章目录 &#x1f38b;死锁的概念&#x1f333;死锁的三个典型情况&#x1f6a9;一个线程一把锁&#x1f6a9;两个线程两把锁&#x1f6a9;n个线程m把锁(哲学家就餐问题) &#x1f384;如何破除死锁&#x1f6a9;破坏循环等待 本文重点&#xff1a; 死锁咋回事 死锁的三个典型…

【SkyWalking】SkyWalking是如何实现跨进程传播链路数据?

文章目录 一、简介1 为什么写这篇文章2 跨进程传播协议-简介 二、协议1 Standard Header项2 Extension Header项3 Correlation Header项 三、跨进程传播协议的源码分析1 OpenTracing规范2 通过dubbo插件分析跨进程数据传播3 分析跨进程传播协议的核心源码 四、小结参考 一、简介…

亚马逊,速卖通,敦煌产品测评补单攻略:低成本、高安全实操指南

随着电商平台的发展和消费者对产品质量的要求提升&#xff0c;测评补单成为了商家们提升销售和用户口碑的关键环节。然而&#xff0c;如何在保持成本低廉的同时确保操作安全&#xff0c;一直是卖家们面临的挑战。今天林哥分享一些实用的技巧和策略&#xff0c;帮助卖家们产品的…

嵌入式C语言自我修养《内存堆栈管理》学习笔记

目录 一、Linux环境下的内存管理 二、栈的管理 三、堆内存管理 四、mmap映射区 五、内存泄漏与防范 六、常见的内存错误及检测 C程序中定义的函数、全局变量、静态变量经过编译链接后&#xff0c;分别以section的形式存储在可执行文件的代码段、数据段和BSS段中。当程序运…

【Zabbix】Zabbix学习笔记

现在Zabbix Server存在的问题&#xff1a; 问题1&#xff1a; Zabbix server: Utilization of discoverer processes over 75% 问题2&#xff1a; Zabbix server: Utilization of icmp pinger processes over 75% 优化的解决办法是修改配置文件把Discovery和Pinger进程数量调大…

04-RocketMQ源码解读

目录汇总&#xff1a;RocketMQ从入门到精通汇总 上一篇&#xff1a;03-RocketMQ高级原理 这一部分&#xff0c;我们开始深入RocketMQ的源码。源码的解读是个非常困难的过程&#xff0c;每个人的理解程度都会不一样&#xff0c;也不太可能通过讲解把其中的细节全部讲明白。我们今…

panads操作excel

panads简介 pandas是基于Numpy创建的Python包&#xff0c;内置了大量标准函数&#xff0c;能够高效地解决数据分析数据处理和分析任务&#xff0c;pandas支持多种文件的操作&#xff0c;比如Excel&#xff0c;csv&#xff0c;json&#xff0c;txt 文件等&#xff0c;读取文件之…

unity发布微信小游戏,未找到 game.json报错原因

unity发布微信小游戏&#xff0c;未找到 game.json报错原因 同一个问题相隔一年遇到两次&#xff0c;两次原因都不一样&#xff0c;记录一下&#xff0c;以后不要再掉坑里 原因一&#xff1a;申请的appID是小程序不是小游戏 解决方法&#xff1a;需要在程序平台修改服务类目 如…

哈希应用之布隆过滤器

文章目录 1.介绍1.1百度搜索1.2知乎好文1.3自身理解 2.模拟实现2.1文档阅读2.2代码剖析 3.误判率的研究4.布隆过滤器的应用4.1如何找到两个分别有100亿个字符串的文件的交集[只有1G内存].分别给出精确算法和近似算法4.2如何扩展BloomFilter使得它支持删除元素的操作 5.整体代码…

pytorch中nn.DataParallel多次使用

pytorch中nn.DataParallel多次使用 import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader# 定义模型 class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()self.fc nn.Linear(10, 1)def forwa…

ROS为机器人装配激光雷达

移动机器人在环境中获取障碍物的具体位置、房间的内部轮廓等信息都是非常必要的&#xff0c;这些信息是机器人创建地图、进行导航的基础数据&#xff0c;除上面所讲的Kinect&#xff0c;还可以使用激光雷达作为这种场景应用下的传感器。 激光雷达可用于测量机器人和其他物体之间…

3.简单场景构建

在新建的项目中&#xff0c;默认存在 Main Camera 和 Directional Light两个对象。若是缺失&#xff0c;可通过选择菜单中的 Game Object->Camera 和 Geme Object->Light->Directional Light进行创建。 1.添加地形及底图 通过在Cesium面板中选择 Cesium World Terrai…

批量文件重命名软件 A Better Finder Rename 11汉化for mac

A Better Finder Rename 11是一款功能强大的文件重命名工具&#xff0c;可在Mac操作系统上使用。它提供了简单而直观的界面&#xff0c;帮助用户快速批量重命名文件和文件夹&#xff0c;提高文件管理和组织效率。 以下是A Better Finder Rename 11可能提供的一些主要功能和特点…

设计模式 - 结构型模式考点篇:适配器模式(类适配器、对象适配器、接口适配器)

目录 一、适配器模式 一句话概括结构式模式 1.1、适配器模式概述 1.2、案例 1.2.1、类适配器模式实现案例 1.2.2、对象适配器 1.2.3、接口适配器 1.3、优缺点&#xff08;对象适配器模式&#xff09; 1.4、应用场景 一、适配器模式 一句话概括结构式模式 教你将类和对…

多线程入门

1 创建线程 下面的程序&#xff0c;我们可以用它来创建一个 POSIX 线程&#xff1a; #include <pthread.h> pthread_create (myThread, attr, start_routine, arg) 在这里&#xff0c;pthread_create 创建一个新的线程&#xff0c;并让它可执行。下面是关于参数的说明…

QT自制软键盘 最完美、最简单、跟自带虚拟键盘一样

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 QT自制软键盘 最完美、最简单、跟自带虚拟键盘一样 Chapter1 QT自制软键盘 最完美、最简单、跟自带虚拟键盘一样一、本自制虚拟键盘特点二、windows打开系统自带软键盘三、让…