pymysql是python操作mysql的标准库,可以通过pip install快速导入pymysql包操作数据库
使用pymysql操作mysql
简单demo
import pymysql
connect = pymysql.connect(host="localhost",port=3306,user="root",password="root",database="my_database",# charset="utf8mb4"
)
cursor = connect.cursor()# 查询语句1
sql = "select * from user where name = %(name)s"
ret = cursor.execute(sql, {"name": "ls"})
# 查询语句2
sql = "select * from user where name = %s"
ret = cursor.execute(sql, "ls")
print(ret)result = cursor.fetchall()
print("result", result)cursor.close()
connect.close()
自定义SqlHelper
import pymysqlclass MySQLClient(object):def __init__(self, **kwargs):self.conn = pymysql.connect(**kwargs)self.cursor = self.conn.cursor()def query(self, sql, *args):try:rowcount = self.cursor.execute(sql, *args)return rowcountexcept Exception as e:raise edef update(self, sql, *args):self.cursor.execute(sql, *args)self.conn.commit()def insert(self, sql, *args):self.cursor.execute(sql, *args)self.conn.commit()def fetch_one(self, sql, *args):self.query(sql, *args)result = self.cursor.fetchone()return resultdef fetch_all(self, sql, *args):self.query(sql, *args)result = self.cursor.fetchone()return resultdef close(self):self.cursor.close()self.conn.close()config = {"host": "localhost","port": 3306,"user": "root","password": "root","database": "my_database",
}mysql_client = MySQLClient(**config)
sql = "select * from user where name=%s"
ret = mysql_client.fetch_one(sql, "ls")
print(ret)# mysql_client.close()
借助DButils创建数据库连接池
DButils模块可以通过创建数据库连接池,提升数据库操作性能;
实现思路:
- 定义SqlHelper类
- 通过
__init__
方法定义pool=PoolDB(**kwargs),_local=threading.local()- 定义
__enter__
获取connection与cursor和__exit__
关闭connection与cursor,可支持with 上下文操作- 为了保证每次获取的connection与cursor不会将之前的覆盖掉,引入threading.local进行保存;self._local = {thread_id: {“stack”: [(connection, cursor)]}}
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from dbutils.pooled_db import PooledDB
from threading import localclass SqlHelper(object):def __init__(self):self.pool = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=5, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建# maxcached=5, # 链接池中最多闲置的链接,0和None不限制blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制setsession=[], # 开始会话前执行的命令列表host='localhost',port=3306,user='root',password='root',database='my_database',charset='utf8')self._local = local()def open(self):connection = self.pool.connection()cursor = connection.cursor()return connection, cursordef close(self, cursor, conn):cursor.close()conn.close()def __enter__(self):conn, cursor = self.open()rv = getattr(self._local, "stack", None)if not rv:self._local.stack = [(conn, cursor)]else:self._local.stack.append((conn, cursor))return cursordef __exit__(self, exc_type, exc_val, exc_tb):rv = getattr(self._local, "stack", None)if not rv:# del self._local.stackreturnelif len(rv) == 1:conn, cursor = rv[-1]# del self._local.stackreturnelse:conn, cursor = rv.pop()cursor.close()conn.close()def fetchone(self, sql, *args):conn, cursor = self.open(self)try:rowcount = cursor.execute(sql, *args)ret = cursor.fetchone()return retexcept Exception as e:raisedef fetchall(self, sql, *args):conn, cursor = self.open(self)try:rowcount = cursor.execute(sql, *args)ret = cursor.fetchall()return retexcept Exception as e:raisedb = SqlHelper()sql = "select * from user"
with db as c1:ret = c1.execute(sql)print(ret)with db as c2:ret = c2.execute(sql)print(ret)
使用DButils的另一种写法
使用这种写法,每次都实例化SqlHelper,保证每次获取的connection和cursor不被覆盖
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
1. 定义全局变量POOL=pooledDB(**kwargs)
2. 每次用到db就实例化一次
"""
import pymysql
from dbutils.pooled_db import PooledDB
from threading import localpool = PooledDB(creator=pymysql, # 使用链接数据库的模块maxconnections=0, # 连接池允许的最大连接数,0和None表示不限制连接数mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建# maxcached=5, # 链接池中最多闲置的链接,0和None不限制blocking=False, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制setsession=[], # 开始会话前执行的命令列表host='localhost',port=3306,user='root',password='root',database='my_database',charset='utf8')class SqlHelper(object):def __init__(self):self.conn = Noneself.cursor = Nonedef open(self):self.connection = pool.connection()self.cursor = self.connection.cursor()return self.connection, self.cursordef close(self):self.cursor.close()self.conn.close()def __enter__(self):self.conn, self.cursor = self.open()return self.cursordef __exit__(self, exc_type, exc_val, exc_tb):self.close()db = SqlHelper()sql = "select * from user"
with db as c1:ret = c1.execute(sql)print("c1.cursor: ", db.cursor)print(ret)with db as c2:ret = c2.execute(sql)print("c2.cursor: ", db.cursor) # 一个实例对象是可以多次调用enter方法的,但db.cursor发生了改变,即上一次的连接丢了print(ret)print(type(c1), type(c2))print(c1 is c2) # falseprint("c1.cursor: ", db.cursor) # c2.cursor将c1.cursor覆盖了