Python中的数据库交互提供了高级API。但是,有时您可能需要执行原始SQL以提高效率或利用数据库特定的特性。本指南介绍在SQLAlchemy框架内执行原始SQL。
在SQLAlchemy中执行原生SQL
SQLAlchemy虽然以其对象-关系映射(ORM)功能而闻名,但也允许直接执行原始SQL语句。当您有复杂的查询、需要优化性能或利用数据库引擎特有的特性时,这可能是有益的。执行原始SQL为您提供了这样做的能力和灵活性。
要执行原始SQL,必须使用SQLAlchemy的Connection对象,该对象可以从Engine或Session上下文中获得。让我们通过渐进式示例探索在SQLAlchemy中执行原始SQL的一些常见模式。
执行简单SQL
要执行原始SQL,需要从引擎获得一个连接:
from sqlalchemy import create_engine
# Replace 'dialect+driver://username:password@host/dbname' with your actual database URI
db_engine = create_engine('dialect+driver://username:password@host/dbname')with db_engine.connect() as connection:result = connection.execute("SELECT * FROM my_table")for row in result:print(row)
这将打印出‘ my_table ’表结果集中的每一行。
参数化查询
出于安全原因和防止SQL注入攻击,永远不要简单地将变量直接插入到SQL字符串中。相反,使用命名参数或位置占位符:
with db_engine.connect() as connection:result = connection.execute("SELECT * FROM users WHERE username = :username", {'username': 'example_user'})user = result.fetchone()print(user)
在上面的示例中,“:username ”是一个占位符,可以被“ example_user ”安全地替换。
使用文本SQL
SQLAlchemy的text函数可以用来创建带有占位符的SQL表达式:
from sqlalchemy.sql import textsql = text("SELECT * FROM users WHERE username = :username")with db_engine.connect() as connection:result = connection.execute(sql, username='example_user')user = result.fetchone()print(user)
这里,文本函数用命名参数包装SQL,提供灵活性和注入预防。
执行插入、更新、删除
INSERT、UPDATE、DELETE等修改操作也可以用类似的方式执行:
# Inserting a new user
insert_sql = text("INSERT INTO users (username, email) VALUES (:username, :email)")with db_engine.connect() as connection:connection.execute(insert_sql, username='new_user', email='new_user@example.com')# Updating a user's email
update_sql = text("UPDATE users SET email = :email WHERE username = :username")with db_engine.connect() as connection:connection.execute(update_sql, email='updated_user@example.com', username='existing_user')# Deleting a user
delete_sql = text("DELETE FROM users WHERE username = :username")with db_engine.connect() as connection:connection.execute(delete_sql, username='obsolete_user')
处理事务
事务处理使用连接对象,在执行SQL语句之前首先开始一个事务。这确保了原子性:
with db_engine.connect() as connection:transaction = connection.begin()try:connection.execute(insert_sql, {...})connection.execute(update_sql, {...})transaction.commit()except:transaction.rollback()raise
这将插入和更新操作包装在一个事务中,该事务可以在失败时回滚。
执行存储过程
存储过程也可以通过原始SQL调用:
call_procedure_sql = text("CALL my_stored_procedure(:param)")with db_engine.connect() as connection:result = connection.execute(call_procedure_sql, param='value')for row in result:print(row)
使用SQLAlchemy Core进行复杂查询
除了简单的文本语句,SQLAlchemy的核心语言还可以将文本SQL与Python逻辑相结合:
from sqlalchemy.sql import select, table, columnt_user = table('users', column('username'), column('email'))
stmt = select([t_user]).where(t_user.c.username == 'example_user')with db_engine.connect() as connection:for row in connection.execute(stmt):print(row)
这个示例演示了如何使用SQLAlchemy Core构造从用户名匹配“example_user”的“users”表中进行选择。
访问本地数据库功能
最后,使用SQLAlchemy,在需要特定于数据库功能的情况下,您可以将原始SQL直接传递给底层DBAPI连接:
with db_engine.raw_connection() as raw_conn:cursor = raw_conn.cursor()cursor.execute("YOUR_VENDOR_SPECIFIC_SQL_HERE")results = cursor.fetchall()for result in results:print(result)cursor.close()
最后总结
本指南重点介绍了使用SQLAlchemy执行原始SQL的各种方法,从简单查询到复杂事务,甚至直接访问DB API功能。负责任地使用这些方法,始终将查询参数化以防止SQL注入,并记住尽可能利用SQLAlchemy健壮的ORM特性。