http - 无状态-无法记录是否已经登陆过
#会话保持 – session cookie
session – 保存一些在服务端
cookie – 保存一些数据在客户端
session在单独服务器D上保存,前面数个服务器A,B,C上去取就好了,业务解耦。—》》现在都是基于token的验证。
以上是基于BS架构
API授权
由服务端完全把控
三张表,api_token,app_permission,api_permission
多对多的关系:一个账号(服务端给的)可以有多个url权限,同一个url可以被多个账号授权。
models/user.py
from . import db#API授权表的模型
#多对多的关系 中间表
app_permission = db.Table("app_permission",db.Column("api_id",db.ForeignKey("api_token.id")),db.Column("permission_id",db.ForeignKey("api_permission.id")))
# api_token表
#存放的是授权密钥,以及授权id
class ApiToken(db.Model):__tablename__ = "api_token"id = db.Column(db.Integer, primary_key=True, autoincrement=True)appid = db.Column(db.String(128), nullable=False)secretkey = db.Column(db.String(128), nullable=False)#通过中间表去创建多对多的关系manage = db.relationship("ApiPermission", secondary=app_permission, backref="token")#存放的是授权的url
class ApiPermission(db.Model):__tablename__ = "api_permission"id = db.Column(db.Integer, primary_key=True, autoincrement=True)url = db.Column(db.String(128), nullable=False)method_type = db.Column(db.String(128), nullable=False)
新增了文件,记得一定要“用到”我们的项目,绑定,运行了没有?
在init文件里加入
from . import user
然后生效到数据库里
terminal
flask --app server:sq_app db migrate
flask --app server:sq_app db upgrade
在Python中,如果你想在不改变业务逻辑源代码的情况下添加额外功能,你可以使用以下几种方法:
使用装饰器:装饰器可以在不改变函数或方法代码的情况下,添加额外的功能。例如,你可以使用装饰器来记录函数执行的时间,或者验证函数的输入。
def timing_decorator(func): def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f"{func.__name__} took {end - start} seconds") return result return wrapper @timing_decorator
def my_function(): # your code here
使用继承:如果你想要在不改变类的方法的情况下添加额外功能,你可以创建一个新的类,继承自原始类,并在新的类中添加额外的方法。
class OriginalClass: # original methods here class NewClass(OriginalClass): def extra_method(self): # extra functionality here
使用Mixin:Mixin是一种设计模式,它允许你在不修改类的情况下,将额外的功能添加到类中。你可以创建一个Mixin类,其中包含你想要添加的额外方法,然后让这个类继承自原始类。
class MixinClass: def extra_method(self): # extra functionality here class OriginalClass: # original methods here class NewClass(OriginalClass, MixinClass): pass # NewClass now has all the methods of OriginalClass and MixinClass
使用AOP(面向切面编程):AOP是一种编程范式,它允许程序员将横切关注点(cross-cutting concerns)从它们所影响的业务逻辑中分离出来。这种范式在处理一些“在哪里执行代码”的问题时特别有用,比如日志记录、事务处理、安全检查等。Python的某些库如Aspectlib、Hybrid等支持这种范式。
使用Monkey Patching:Monkey Patching是一种动态(运行时)修改模块或类的技术。使用这种方法,你可以在不修改源代码的情况下添加或改变功能。然而,这种方法应该谨慎使用,因为它可能会导致代码难以理解和维护。
libs/auth.py
from flask import request
from models.user import ApiToken, ApiPermission
from hashlib import md5
# import timedef auth_required(func):def inner(*args, **kwargs):if api_auth():return func(*args, **kwargs)else:return "认证失败"return inner# api授权认证函数 -- 函数返回为真表示认证成功
# 哈希算法 -- md5单向加密
def api_auth():params = request.args # 客户端url传递过来的参数appid = params.get("appid")salt = params.get("salt") # 盐值sign = params.get("sign") # 签名timestamp = params.get("timestamp") # 时间戳# if time.time() - int(timestamp) > 600:# return Falseapi_token = ApiToken.query.filter_by(appid=appid).first()if not api_token:return False# 验证有没有此url和方法的权限# http://127.0.0.1:8000/v1/monitor GET# /v1/monitor getif not has_permission(api_token, request.path, request.method.lower()):return False# 获取数据库里的密钥secretkey = api_token.secretkey# 生成服务端的签名# 可以加上时间戳来防止签名被别人盗取,重复访问# user_sign = appid + salt + secretkeyuser_sign = appid + salt + secretkey ##+ timestampm1 = md5()m1.update(user_sign.encode(encoding="utf-8"))# 判断客户端传递过来的签名和服务端生成签名是否一致if sign != m1.hexdigest():# raise AuthFailExceptionreturn Falseelse:return Truedef has_permission(api_token, url, method):# 客户端请求的方法和url# get/v1/monitormypermission = method + url# 获取此api_token对象的所有url权限all_permission = [permission.method_type + permission.urlfor permission in api_token.manage]# ['get/v1/monitor', 'post/v1/monitor']if mypermission in all_permission:return Trueelse:return False
router/product_view/product_api.py
from libs.auth import auth_requiredclass ProductView(Resource):# @装饰器@auth_requireddef get(self, id = None):
api授权流程
1.客户端向服务端申请授权,服务端向客户端提供appid和secretkey,以及加密算法
2.客户端按照服务端提供的信息、算法,生成签名,请求时发送给服务端
3.服务端收到信息,验证是否成功
客户端请求代码libs/api_auth_clilent.py
import requests
import random
import hashlibdef calculate_sign(appid, salt, secretkey):user_sign = appid + salt +secretkeym1 = hashlib.md5()m1.update(user_sign.encode(encoding='utf-8'))return m1.hexdigest()length_of_string = random.randint(1, 10)
appid = "sc"
salt = "JD"
secretkey = "123456"sign =calculate_sign(appid, salt, secretkey)url = 'http://127.0.0.1:9000/v1/product'
params = {'appid': appid,'salt': salt,'sign': s ign
}
response = requests.get(url, params=params)
if response.status_code == 200:print(response.json())
else:print(f"请求失败,状态码:{response.status_code}")
运行结果:
为了更安全,可以加入时间戳
注册登录
models/user.py
"""
@date: 2023/9/16
@file: user
@author: Jiangda
@desc: test"""from . import db
from werkzeug.security import generate_password_hash
import datetime#API授权表的模型
#多对多的关系 中间表
app_permission = db.Table("app_permission",db.Column("api_id",db.ForeignKey("api_token.id")),db.Column("permission_id",db.ForeignKey("api_permission.id")))
# api_token表
#存放的是授权密钥,以及授权id
class ApiToken(db.Model):__tablename__ = "api_token"id = db.Column(db.Integer, primary_key=True, autoincrement=True)appid = db.Column(db.String(128), nullable=False)secretkey = db.Column(db.String(128), nullable=False)#通过中间表去创建多对多的关系manage = db.relationship("ApiPermission", secondary=app_permission, backref="token")#存放的是授权的url
class ApiPermission(db.Model):__tablename__ = "api_permission"id = db.Column(db.Integer, primary_key=True, autoincrement=True)url = db.Column(db.String(128), nullable=False)method_type = db.Column(db.String(128), nullable=False)class User(db.Model):__tablename__ = "userdb"id = db.Column(db.Integer, primary_key=True, autoincrement=True)username = db.Column(db.String(128), nullable = False)_password = db.Column("password", db.String(128), nullable=False)role = db.Column(db.Integer, default=0)add_time = db.Column(db.DateTime, default = datetime.datetime.now)# 属性包装装饰器:python内置装饰器# 作用:把方法当作属性一样使用,定义属性之前做一些检测、转换#@property #自动根据函数名生成两个装饰器 ==》password.setter password.deletedef password(self):return self._password# user.password() -->print(user.password)@password.setterdef password(self, value):self._password = generate_password_hash(value)@classmethod #类方法 第一个参数代表类本身def create_user(cls, username, password):user = cls() #创建实例对象user.username = usernameuser.password = password #调用password.setter装饰的函数,强制性要求存hash值db.session.add(user)db.session.commit()
接着,对数据进行校验(密码合法性什么的,有专门的库)
pip install wtforms DataRequired, Regexp, ValidationError
forms/user.py
from wtforms import Form, StringField
from wtforms.validators import DataRequired, Regexp, ValidationError
from models.user import Userclass Userform(Form):username = StringField(validators=[DataRequired()])password = StringField(validators=[DataRequired(), Regexp(r'\w{6,18}', message="密码不符合要求")])# 自定义验证器,验证用户名是否唯一
# 自定义检查字段 方法名:validate_你要检查的字段def validate_username(self, value):if User.query.filter_by(username = value.data).first():raise ValidationError("用户已存在")
修改router/user/user.py(做了数据校验的版本)
from flask import request
from . import user_bp
from flask_restful import Resource, Api
from models.user import User
from libs.response import generate_response
from forms.user import Userformapi = Api(user_bp)class UserRegister(Resource):
# post方法 -- json /v1/user# # 没做数据校验的版本(如下)# def post(self):# username = request.json.get("username")# password = request.json.get("password")## if username is not None:# if password is not None:# new_user = User.create_user(username, password)# return generate_response(msg="register success!")# else:# return generate_response(msg="register fail!", code=20)# else:# return generate_response(msg="no username", code=21)#做了数据校验的版本def post(self):#try:data = request.jsonform = Userform(data=data)if form.validate():User.create_user(username = data.get("username"),password = form.password.data)return generate_response(msg="注册成功", code=0)else:return generate_response(code=1, msg=form.errors)#except:#return generate_response(code=1, msg="注册失败!")api.add_resource(UserRegister,"/user")