# -- coding: utf-8 --
# @Time : 2023/2/28 22:16
# @File : models.py
from werkzeug.security import check_password_hash, generate_password_hash
from app import db
"""
user - dept:一对一
user - rose:多对多
permission - rose:多对多
"""
# 用户模型
class User(db.Model):
__tablename__ = 'admin_user'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(32), unique=True)
password_hash = db.Column(db.String(128))
enable = db.Column(db.Integer, default=0, comment='启用')
dept_id = db.Column(db.Integer, comment='部门id') # 一对一
role = db.relationship('Role', secondary="admin_user_role", backref=db.backref('user'), lazy='dynamic')
@staticmethod
def generate_hash(password):
return generate_password_hash(password)
@staticmethod
def verify_hash(password, hash):
return check_password_hash(hash, password)
def get_id(self):
return str(self.id)
@classmethod
def find_by_username(cls, username):
return cls.query.filter_by(username=username).first()
def create(self):
db.session.add(self)
db.session.commit()
return self
# 创建中间表:用户和角色多对多
user_role = db.Table(
"admin_user_role", # 中间表名称
db.Column("id", db.Integer, primary_key=True, autoincrement=True, comment='标识'), # 主键
db.Column("user_id", db.Integer, db.ForeignKey("admin_user.id"), comment='用户编号'), # 属性 外键
db.Column("role_id", db.Integer, db.ForeignKey("admin_role.id"), comment='角色编号'), # 属性 外键
)
# 角色模型
class Role(db.Model):
__tablename__ = 'admin_role'
id = db.Column(db.Integer, primary_key=True, comment='角色ID')
name = db.Column(db.String(255), comment='角色名称')
code = db.Column(db.String(255), comment='角色标识')
enable = db.Column(db.Integer, comment='是否启用')
remark = db.Column(db.String(255), comment='备注')
details = db.Column(db.String(255), comment='详情')
sort = db.Column(db.Integer, comment='排序')
permissions = db.relationship('Permission', secondary="admin_role_permission", backref=db.backref('role'))
def create(self):
db.session.add(self)
db.session.commit()
return self
# 创建中间表:角色和权限多对多
role_power = db.Table(
"admin_role_permission", # 中间表名称
db.Column("id", db.Integer, primary_key=True, autoincrement=True, comment='标识'), # 主键
db.Column("permission_id", db.Integer, db.ForeignKey("admin_permission.id"), comment='用户编号'), # 属性 外键
db.Column("role_id", db.Integer, db.ForeignKey("admin_role.id"), comment='角色编号'), # 属性 外键
)
# 权限表
class Permission(db.Model):
__tablename__ = 'admin_permission'
id = db.Column(db.Integer, primary_key=True, comment='权限编号')
name = db.Column(db.String(255), comment='权限名称')
type = db.Column(db.String(1), comment='权限类型')
code = db.Column(db.String(30), comment='权限标识')
url = db.Column(db.String(255), comment='权限路径')
open_type = db.Column(db.String(10), comment='打开方式')
parent_id = db.Column(db.Integer, comment='父类编号')
icon = db.Column(db.String(128), comment='图标')
sort = db.Column(db.Integer, comment='排序')
enable = db.Column(db.Integer, comment='是否开启')
def create(self):
db.session.add(self)
db.session.commit()
return self
# 部门表:部门和用户:一对一
class Dept(db.Model):
__tablename__ = 'admin_dept'
id = db.Column(db.Integer, primary_key=True, comment="部门ID")
parent_id = db.Column(db.Integer, comment="父级编号")
dept_name = db.Column(db.String(50), comment="部门名称")
sort = db.Column(db.Integer, comment="排序")
leader = db.Column(db.String(50), comment="负责人")
phone = db.Column(db.String(20), comment="联系方式")
email = db.Column(db.String(50), comment="邮箱")
status = db.Column(db.Integer, comment='状态(1开启,0关闭)')
remark = db.Column(db.Text, comment="备注")
def create(self):
db.session.add(self)
db.session.commit()
return self
# -- coding: utf-8 --
# @Time : 2023/2/28 22:22
# @File : routes.py
import json
import traceback
from flask import jsonify, make_response, request
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
from flask_restful import Api, Resource, abort
from app import db
from app.rbac import rbac_bp
from app.rbac.models import User, Role, Dept, Permission
from app.rbac.schema import UserSchema, DeptSchema, RoleSchema, PermissionSchema
from app.utils.rights import authorize
from app.utils.responses import response_with
from app.utils import responses as resp
api = Api(rbac_bp)
@rbac_bp.route("/init_db/")
def init_db():
# 创建数据库
db.create_all()
return response_with(resp.SUCCESS_200, message="init table success.")
@rbac_bp.route('/login/', methods=["POST"])
def login():
"""登录"""
try:
data = json.loads(request.get_data())
user_obj = User.find_by_username(data['username'])
if not user_obj:
return response_with(resp.NOT_FOUND_HANDLER_404, message="user not found.")
# 登录成功,需要 生成 jwt字符串
if User.verify_hash(data['password'], user_obj.password_hash):
# 用户权限
user_permissions = []
for role in user_obj.role:
# 未启用
if role.enable == 0:
continue
for p in role.permissions:
if p.enable == 0:
continue
user_permissions.append(p.code)
payload = {
'user_id': user_obj.id,
'username': user_obj.username,
'roles': [r.name for r in user_obj.role],
'permissions': user_permissions
}
print(f"payload: {
payload}")
"""
{'user_id': 3, 'username': 'admin', 'roles': ['管理员'],
'permissions': ['admin:user:main', 'admin:permission:main', 'admin:permission:add', 'admin:permission:edit',
'admin:permission:remove', 'admin:role:main', 'admin:role:add', 'admin:role:edit', 'admin:role:remove',
'admin:user:add', 'admin:user:edit', 'admin:user:remove', 'admin:dept:main', 'admin:dept:add',
'admin:dept:edit', 'admin:dept:remove']}
"""
access_token = create_access_token(identity=payload)
return response_with(resp.SUCCESS_201, message=f"{
user_obj.username} login success.",
value={
"access_token": access_token})
else:
return response_with((resp.UNAUTHORIZED_401))
except Exception as e:
print(traceback.format_exc())
return response_with(resp.INVALID_INPUT_422)
class UserListResource(Resource):
"""
用户
"""
@authorize("admin:user:main")
def get(self):
users = User.query.all()
return UserSchema(many=True, only=("id", "username", "dept_id", "role")).dump(users)
@authorize("admin:user:add")
def post(self):
json_data = json.loads(request.get_data())
if not json_data:
return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
try:
json_data['password_hash'] = User.generate_hash(json_data.pop('password', None))
print(json_data)
# user = user_schema.load(json_data)
# user.create()
user = User(**json_data)
db.session.add(user)
db.session.commit()
except Exception as e:
db.session.rollback()
print(traceback.format_exc())
return response_with(resp.BAD_REQUEST_400, message="Could not create user. Error: {}".format(str(e)))
return response_with(resp.SUCCESS_201, message="User create success.")
class DeptListResource(Resource):
"""
部门
"""
@authorize("admin:dept:main")
def get(self):
depts = Dept.query.all()
return DeptSchema(many=True, only=("id", "dept_name")).dump(depts)
@authorize("admin:dept:add")
def post(self):
json_data = json.loads(request.get_data())
if not json_data:
return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
try:
dept = DeptSchema().load(json_data)
print(json_data)
print(dept)
dept.data.create()
except Exception as e:
db.session.rollback()
print(traceback.format_exc())
return response_with(resp.BAD_REQUEST_400, message="Could not create dept. Error: {}".format(str(e)))
return response_with(resp.SUCCESS_201, message="Dept create success.")
class PermissionListResource(Resource):
"""
权限
"""
@authorize("admin:permission:main")
def get(self):
permissions = Permission.query.all()
return PermissionSchema(many=True,
only=("id", "name", "code", "type", "url", "enable", "open_type", "parent_id")).dump(
permissions)
@authorize("admin:permission:add")
def post(self):
json_data = json.loads(request.get_data())
if not json_data:
return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
try:
permission = PermissionSchema().load(json_data)
print(json_data)
print(permission)
permission.data.create()
except Exception as e:
db.session.rollback()
print(traceback.format_exc())
return response_with(resp.BAD_REQUEST_400, message="Could not create permission. Error: {}".format(str(e)))
return response_with(resp.SUCCESS_201, message="Permission create success.")
class RoleListResource(Resource):
"""
角色视图:获取所有角色、创建角色
"""
# @authorize("admin:role:main")
def get(self):
roles = Role.query.all()
print(f"roles: {
roles}")
return RoleSchema(many=True).dump(roles)
@authorize("admin:role:add")
def post(self):
args = request.get_data()
if not args:
return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
try:
json_data = json.loads(args)
role = RoleSchema().load(json_data)
print(json_data)
print(role)
role.data.create()
except Exception as e:
db.session.rollback()
print(traceback.format_exc())
abort(400, message="Could not create permission. Error: {}".format(str(e)))
return response_with(resp.SUCCESS_201, message="Role create success.")
class RoleResource(Resource):
"""
角色视图:获取角色详情、修改角色、删除角色
特征:单独操作某一个角色
"""
@authorize("admin:role:main")
def get(self, role_id):
"""单独获取某个角色"""
role = Role.query.get(role_id)
if not role:
return response_with(resp.NOT_FOUND_HANDLER_404, message="Role not found.")
return RoleSchema().dump(role)
@authorize("admin:role:edit")
def put(self, role_id):
"""修改角色"""
role = Role.query.get(role_id)
if not role:
return response_with(resp.NOT_FOUND_HANDLER_404, message="Role not found.")
args = request.get_data()
if not args:
return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
try:
json_data = json.loads(args)
role.name = json_data.get("name", "")
role.details = json_data.get("details", "")
role.enable = json_data.get("enable", 1)
db.session.add(role)
db.session.commit()
except Exception as e:
print(traceback.format_exc())
db.session.rollback()
return response_with(resp.BAD_REQUEST_400, message="Could not update role. Error: {}".format(str(e)))
return RoleSchema().dump(role)
@authorize("admin:role:remove")
def delete(self, role_id):
"""删除角色"""
role = Role.query.get(role_id)
if not role:
return response_with(resp.NOT_FOUND_HANDLER_404, message="Role not found.")
try:
db.session.delete(role)
db.session.commit()
except Exception as e:
db.session.rollback()
abort(400, message="Could not delete role. Error: {}".format(str(e)))
return response_with(resp.SUCCESS_204, message="Role deleted successfully.")
api.add_resource(UserListResource, '/users')
api.add_resource(DeptListResource, '/depts')
api.add_resource(PermissionListResource, '/permissions')
api.add_resource(RoleListResource, '/roles')
api.add_resource(RoleResource, '/roles/<int:role_id>')
# -- coding: utf-8 --
# @Time : 2023/3/11 23:10
# @Author : hj
# @File : rights.py
import traceback
from functools import wraps
from flask_jwt_extended import get_jwt_identity, get_jwt, verify_jwt_in_request
from flask import request, jsonify
from app.utils.responses import response_with
from app.utils import responses as resp
def authorize(permission_name: str):
"""
权限校验装饰器
:param permission_name:
:return:
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取 Authorization 头
auth_header = request.headers.get('Authorization', "")
print(auth_header)
if not auth_header:
return response_with(resp.UNAUTHORIZED_401)
# 解析 jwt
try:
# 验证请求中存在有效的JWT,除非' ' optional=True ' '
verify_jwt_in_request()
token = auth_header.split()[1]
# 解析 jwt
user_info = get_jwt_identity()
print(f"user_info: {
user_info}")
roles = user_info.get("roles", [])
permissions = user_info.get("permissions", [])
print(f"roles: {
roles}")
print(f"permissions: {
permissions}")
# 管理员
if "管理员" in roles:
return func(*args, **kwargs)
# 检查权限
if permission_name not in permissions:
return response_with(resp.FORBIDDEN_403)
except:
print(traceback.format_exc())
return {
"msg": "Invalid token"}, 401
return func(*args, **kwargs)
return wrapper
return decorator





还没有评论,来说两句吧...