前言:
工作需求,写一个接口,用Python来编写,我首先想到用flask小型框架来支撑,配置sqlalchemy来实现,但是在实现的过程中,发生循环导入问题
我想到用蓝图来解决此问题,但是仍然会出死循环的问题,网上找了很多资料,许多都是轻描淡写,可能不是很明白,为了能够帮助更多踩坑的人,分享一下自己的经历!!
了解导入循环的机制
报错代码:
cannot import name '......' from partially initialized module '.....'
Python的 死循环机制:
当模块之间相互导入时,两个或多个模块之间相互依赖,它们通过import
语句相互引用对方,而没有一个明确的导入顺序或者依赖管理来打破这种循环,形成了一个闭环,当尝试从任何一个模块中导入或执行函数时,Python 解释器将陷入无限循环的导入过程中
比如:A导入B,B导入A
模块A:module_a.py
import module_b def func_a(): print("Function A") module_b.func_b()
模块B:module_b.py
import module_a def func_b(): print("Function B") module_a.func_a()
比如:A导入C,B导入A,C导入B
模块A:test1.py
import test3def t1():print('test1')test3.t3()t1()
模块B:test2.py
import test1def t2():test1.t1()print("test2")
模块C:test3.py
import test2def t3():test2.t2()print("test3")
解决sqlalchemy集成flask导入死循环的问题
言归正传,要解决问题,要注意db实例化的位置,app导入的位置,蓝图导入的顺序!!!
解决方案:
1. 在cab_api_test/views/__init__.py 文件下,实例化db,函数封装app
3. 导入蓝图时,如果蓝图有引用db,位置需要在db的下方
2. 在models中,main语句下,导入app,如果在全局还是会发生循环问题
可能说的有些不清不楚,还是通过代码体现,就能明白啦,每步骤关键都有注释,注意好导入的顺序,和导入的位置,主要是app和db的位置,和模型创建用到的app导入问题!!
目录结构:
cab_api_test
├── app.py
├── gunicorn_config.py
├── logs
│ ├── cab_api_access.log
│ └── cab_api_error.log
├── requirements.txt
└── views
├── __init__.py
├── __pycache__
│ ├── __init__.py
│ ├── cab_api.py
│ ├── filed_heck.py
│ ├── logger.py
│ ├── models.py
│ └── session.py
cab_api_test/views/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemydb = SQLAlchemy()# 1. 实例化db时 要注意导入蓝图的顺序!!!
# 2. 如果蓝图中有引用db,在这导入的蓝图必须在db的下面,不然还是会报错
# 3. 我是用的蓝图是 cab_api.py
from views.cab_api import cab_apihost = '127.0.0.1'
port = 3306
user = 'root'
password = 'wq123456'
database = 'gf_test'def create_app():app = Flask(__name__)app.config['DEBUG'] = True# sqlalchemy 连接池配置app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falseapp.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4&autocommit=true'app.config['SQLALCHEMY_POOL_SIZE'] = 10app.config['SQLALCHEMY_POOL_TIMEOUT'] = 10app.config['SQLALCHEMY_MAX_OVERFLOW'] = 5app.config['SQLALCHEMY_POOL_PRE_PING'] = Trueapp.config['SQLALCHEMY_ECHO'] = False# 把app对象初始化到db中db.init_app(app=app)app.register_blueprint(cab_api, url_prefix='/api')return app
cab_api_test/views/models.py
import datetime
from uuid import uuid4
from sqlalchemy import Column, String, Text, DateTime# 这个db是全新的SQLAlchemy对象
from views import dbclass Cab_review(db.Model):__tablename__ = 'cab_review' # 表名# 写字段uuid = Column(String(100), nullable=False, primary_key=True, default=str(uuid4()).replace('-', ''))cab_number = Column(String(64), nullable=True, comment='工单号')review_item = Column(String(100), comment='项目名') # String(32) == varchar(32)review_info = Column(Text, comment='项目信息')review_link = Column(String(200), nullable=True, comment='项目链接')caller_id = Column(String(100), nullable=True, comment='调用方标识ID')caller_ip = Column(String(100), nullable=True, comment='调用方IP')updated_time = Column(DateTime, default=datetime.datetime.now, nullable=False, comment='更新时间')test_number = Column(String(64), nullable=True, comment='测试号')if __name__ == '__main__': # 导入时不执行下面的语句,只有执行当前文件的时候才会执行下面的代码# 注意要在main下面导入create_app,如果在全局导入,还是会发生循环导入的问题from views import create_appapp = create_app()# 先清空再创建db.drop_all()db.create_all(app=app)# 做一个简单的封装 db,使用with 语句
class Session:def __init__(self):self.db = Nonedef __enter__(self):self.db = db.sessionreturn self.dbdef __exit__(self, exc_type, exc_val, exc_tb):if self.db:self.db.close()
cab_api_test/views/cab_api.py
from flask import jsonify, request
import json, datetime
from uuid import uuid4
from views.logger import flask_logger
from views.filed_check import field_check
from flask import Blueprint
# Session 是我封装好的 db
from views.models import Cab_review, Sessioncab_api = Blueprint('cab_api', __name__)@cab_api.route('/', methods=['POST', ])
def main():# 传入的参数data = request.get_json()cno_test = data.get('cno_test')review_pro = data.get('review_pro')review_result = data.get('review_result')review_advice = data.get('review_advice')flask_logger.info(f'请求体:{data}')data['cno'] = '已废弃' if not data.get('cno') else data.get('cno')data['caller_ip'] = request.remote_addrdata['url'] = data.get('url') if data.get('url') else ''data['caller_id'] = data.get('caller_id') if data.get('caller_id') else '调用方未提供caller_id'# 校验字段ok, msg = field_check(cno_test, review_pro, review_result, review_advice)if not ok:flask_logger.info('-------------------------------------------------------------------------------------------------------')return jsonify(msg)# 查询cab_review中的数据with Session() as session:select_data = session.query(Cab_review).filter_by(test_number=cno_test,review_item=review_pro).all() # [queryset,queryset],数据为空则 []session.begin() # 开启事务if not select_data:try:cab_review = Cab_review(uuid=str(uuid4()).replace('-', ''),cab_number=data.get('cno'),review_item=data.get('review_pro').strip(),review_info=json.dumps({'review_result': data.get('review_result'),'review_advice': data.get('review_advice')},ensure_ascii=False),review_link=data.get('url'),caller_id=data.get('caller_id'),caller_ip=data.get('caller_ip'),test_number=data.get('cno_test'))session.add(cab_review)session.commit()flask_logger.info('数据插入成功')msg = '请求成功,数据已添加或更新'is_success = Trueexcept Exception as e:session.rollback()flask_logger.error(f'添加失败:{e}', exc_info=False)msg = '数据添加或更新失败,请联系管理员'is_success = Falseelse:try:session.query(Cab_review).filter_by(test_number=data.get('cno_test'),review_item=data.get('review_pro').strip()).update({'review_info': json.dumps({'review_result': data.get('review_result'), 'review_advice': data.get('review_advice')},ensure_ascii=False), 'review_link': data.get('url'),'caller_id': data.get('caller_id'),'caller_ip': data.get('caller_ip'), 'updated_time': datetime.datetime.now()})session.commit()flask_logger.info('数据更新成功')msg = '请求成功,数据已添加或更新'is_success = Trueexcept Exception as e:session.rollback()flask_logger.error(f'更新失败:{e}', exc_info=False)msg = '数据添加或更新失败,请联系管理员'is_success = Falsereview_name = ['评审项:' + review_pro, '评审结果:' + review_result, '评审意见:' + review_advice,'评审链接:' + data.get('url')]flask_logger.info('-------------------------------------------------------------------------------------------------------')return {'code': 200, 'success': is_success, 'msg': msg, 'data': '\n'.join(review_name)}@cab_api.route('/', methods=['GET', ])
def health_check():return 'ok'
这样就不会在发生导入死循环的问题啦!!!