目录
一、环境
二、MySQL的连接和使用
2.1方式一:sql为主
2.1.1创建连接
2.1.2 表结构
2.1.3 新增数据
编辑
2.1.4 查看数据
编辑
2.1.5 修改数据
2.1.6 删除数据
2.2方式二:orm对象关系映射
2.2.1 mysql连接
2.2.2 创建表
2.2.3 新增数据
编辑
2.2.4 查询数据
2.2.5 修改数据
编辑
2.2.6 删除数据
一、环境
工作中需要用到python和mysql数据库,本次文档记录相关操作。
环境:windows10、python 3.11.7
mysql版本:5.7
二、MySQL的连接和使用
本人使用过的两种方式
2.1方式一:sql为主
2.1.1创建连接
import sqlalchemy
from sqlalchemy.orm import scoped_session, sessionmakerUSERNAME = "root" # 用户名
PASSWORD = "123456" # 密码
ADDR = "localhost" # 连接地址 本地localhost 服务器就是服务器的地址XXX
PORT = "3306" # mysql端口号
DATABASE = "test" # 连接的数据库名class MysqlSession:def __init__(self):self.create_connection()def create_connection(self):engine = sqlalchemy.create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{ADDR}:{PORT}/{DATABASE}?charset=utf8",max_overflow=50, # 超过连接池大小外最多创建的连接pool_size=50, # 连接池大小pool_recycle=30 # 多久之后对线程池中的线程进行一次连接的回收(重置),, echo=False)self.connection = scoped_session(sessionmaker(bind=engine))def get_connection(self):return self.connectiondef ins(self, sql): # 新增数据return self.connection.execute(sql).lastrowiddef arr(self, sql): # 查询多条数据return self.connection.execute(sql).fetchall()def obj(self, sql): # 查询单个数据return self.connection.execute(sql).fetchone()def upd(self, sql): # 修改单个数据return self.connection.execute(sql)def dlt(self, sql): # 删除数据return self.connection.execute(sql)def commit(self):self.connection.commit() # 提交self.connection.remove() # 结束会话
2.1.2 表结构
2.1.3 新增数据
mysql_session = MysqlSession()
connection = mysql_session.get_connection()
table_name = 'test.user' # 表名(这里是数据库名+表名)
# 新增数据,返回的该条数据的id
name_remark = [{'name': 'Alice', 'remark': '可能是个女生'},{'name': 'Bob', 'remark': "可能是个男生"},{'name': 'Tammi'}
]
new_ids = []
for p in name_remark:name = p.get('name')remark = p.get('remark')sql = f"""insert into {table_name} (name,remark) values ('{name}','{remark}')"""user_id = mysql_session.ins(sql)new_ids.append(user_id)
print(new_ids)
mysql_session.commit()
2.1.4 查看数据
# 查看所有数据
sql = f"""select * from {table_name}"""
users = mysql_session.arr(sql)
for u in users:print(u)
# 查看指定数据
sql = f"""select * from {table_name} where name='Alice'"""
user = mysql_session.obj(sql)
print(user)
2.1.5 修改数据
# 修改指定数据
sql = f"""update {table_name} set name='Alice_changed' where id=16"""
mysql_session.upd(sql)
mysql_session.commit()
# 查看刚才修改的数据
sql = f"""select * from {table_name} where id=16 """
user = mysql_session.obj(sql)
print(user)
2.1.6 删除数据
# 删除指定数据
sql = f"""delete from {table_name} where id=16"""
mysql_session.dlt(sql)
mysql_session.commit()
# 查看所有数据
users = show_test(mysql_session, table_name)
print(users)
2.2方式二:orm对象关系映射
因为现目前工作中没有用到这个(以前用django的时候有用到过orm),这里就简单记录一下测试情况。
2.2.1 mysql连接
mysql_session.py
import sqlalchemy
from sqlalchemy.orm import sessionmakerUSERNAME = "root" # 用户名
PASSWORD = "123456" # 密码
ADDR = "localhost" # 连接地址 本地localhost 服务器就是服务器的地址XXX
PORT = "3306" # mysql端口号
DATABASE = "test" # 连接的数据库名class MysqlSession:def __init__(self):self.create_connection()def create_connection(self):self.engine = sqlalchemy.create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{ADDR}:{PORT}/{DATABASE}?charset=utf8",max_overflow=50, # 超过连接池大小外最多创建的连接pool_size=50, # 连接池大小pool_recycle=30, # 多久之后对线程池中的线程进行一次连接的回收(重置),echo=False)Session = sessionmaker(bind=self.engine)self.session = Session()def get_session(self):return self.sessiondef get_engine(self):return self.engine
2.2.2 创建表
models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_basefrom mysql_session import MysqlSessionBase = declarative_base()class User(Base):__tablename__ = 'person'id = Column(Integer, primary_key=True, autoincrement=True) # 主键 自增name = Column(String(255))age = Column(Integer)engine = MysqlSession().get_engine()
Base.metadata.create_all(engine) #创建上面的表,运行一次即可
2.2.3 新增数据
from models import Person
from mysql_session import MysqlSessionsession = MysqlSession().get_session()# 新增几条数据
new_user = Person(name='张三', age=30)
session.add(new_user)
new_user = Person(name='李四', age=40)
session.add(new_user)
new_user = Person(name='王五', age=50)
session.add(new_user)
session.commit()
2.2.4 查询数据
# 查询所有数据
users = session.query(Person).all()
for user in users:print(user.name, user.age)
print("**************************************")
# 指定信息
user = session.query(Person).filter_by(name='李四').first()
print(user.name, user.age)
2.2.5 修改数据
# 修改数据
user = session.query(Person).filter_by(name='李四').first()
if user:user.age = 400 # 将年龄修改为400session.commit()print("updated success.")print(user.name, user.age)
else:print("not found.")
2.2.6 删除数据
# 删除所有年龄大于100的用户
users = session.query(Person).filter(Person.age > 100).all()
for u in users:print(u.name, u.age)
for user in users:session.delete(user)
session.commit()