RBAC 权限管理系统

梦里梦外; 2023-09-30 23:53 136阅读 0赞
  1. # -- coding: utf-8 --
  2. # @Time : 2023/2/28 22:16
  3. # @File : models.py
  4. from werkzeug.security import check_password_hash, generate_password_hash
  5. from app import db
  6. """
  7. user - dept:一对一
  8. user - rose:多对多
  9. permission - rose:多对多
  10. """
  11. # 用户模型
  12. class User(db.Model):
  13. __tablename__ = 'admin_user'
  14. id = db.Column(db.Integer, primary_key=True)
  15. username = db.Column(db.String(32), unique=True)
  16. password_hash = db.Column(db.String(128))
  17. enable = db.Column(db.Integer, default=0, comment='启用')
  18. dept_id = db.Column(db.Integer, comment='部门id') # 一对一
  19. role = db.relationship('Role', secondary="admin_user_role", backref=db.backref('user'), lazy='dynamic')
  20. @staticmethod
  21. def generate_hash(password):
  22. return generate_password_hash(password)
  23. @staticmethod
  24. def verify_hash(password, hash):
  25. return check_password_hash(hash, password)
  26. def get_id(self):
  27. return str(self.id)
  28. @classmethod
  29. def find_by_username(cls, username):
  30. return cls.query.filter_by(username=username).first()
  31. def create(self):
  32. db.session.add(self)
  33. db.session.commit()
  34. return self
  35. # 创建中间表:用户和角色多对多
  36. user_role = db.Table(
  37. "admin_user_role", # 中间表名称
  38. db.Column("id", db.Integer, primary_key=True, autoincrement=True, comment='标识'), # 主键
  39. db.Column("user_id", db.Integer, db.ForeignKey("admin_user.id"), comment='用户编号'), # 属性 外键
  40. db.Column("role_id", db.Integer, db.ForeignKey("admin_role.id"), comment='角色编号'), # 属性 外键
  41. )
  42. # 角色模型
  43. class Role(db.Model):
  44. __tablename__ = 'admin_role'
  45. id = db.Column(db.Integer, primary_key=True, comment='角色ID')
  46. name = db.Column(db.String(255), comment='角色名称')
  47. code = db.Column(db.String(255), comment='角色标识')
  48. enable = db.Column(db.Integer, comment='是否启用')
  49. remark = db.Column(db.String(255), comment='备注')
  50. details = db.Column(db.String(255), comment='详情')
  51. sort = db.Column(db.Integer, comment='排序')
  52. permissions = db.relationship('Permission', secondary="admin_role_permission", backref=db.backref('role'))
  53. def create(self):
  54. db.session.add(self)
  55. db.session.commit()
  56. return self
  57. # 创建中间表:角色和权限多对多
  58. role_power = db.Table(
  59. "admin_role_permission", # 中间表名称
  60. db.Column("id", db.Integer, primary_key=True, autoincrement=True, comment='标识'), # 主键
  61. db.Column("permission_id", db.Integer, db.ForeignKey("admin_permission.id"), comment='用户编号'), # 属性 外键
  62. db.Column("role_id", db.Integer, db.ForeignKey("admin_role.id"), comment='角色编号'), # 属性 外键
  63. )
  64. # 权限表
  65. class Permission(db.Model):
  66. __tablename__ = 'admin_permission'
  67. id = db.Column(db.Integer, primary_key=True, comment='权限编号')
  68. name = db.Column(db.String(255), comment='权限名称')
  69. type = db.Column(db.String(1), comment='权限类型')
  70. code = db.Column(db.String(30), comment='权限标识')
  71. url = db.Column(db.String(255), comment='权限路径')
  72. open_type = db.Column(db.String(10), comment='打开方式')
  73. parent_id = db.Column(db.Integer, comment='父类编号')
  74. icon = db.Column(db.String(128), comment='图标')
  75. sort = db.Column(db.Integer, comment='排序')
  76. enable = db.Column(db.Integer, comment='是否开启')
  77. def create(self):
  78. db.session.add(self)
  79. db.session.commit()
  80. return self
  81. # 部门表:部门和用户:一对一
  82. class Dept(db.Model):
  83. __tablename__ = 'admin_dept'
  84. id = db.Column(db.Integer, primary_key=True, comment="部门ID")
  85. parent_id = db.Column(db.Integer, comment="父级编号")
  86. dept_name = db.Column(db.String(50), comment="部门名称")
  87. sort = db.Column(db.Integer, comment="排序")
  88. leader = db.Column(db.String(50), comment="负责人")
  89. phone = db.Column(db.String(20), comment="联系方式")
  90. email = db.Column(db.String(50), comment="邮箱")
  91. status = db.Column(db.Integer, comment='状态(1开启,0关闭)')
  92. remark = db.Column(db.Text, comment="备注")
  93. def create(self):
  94. db.session.add(self)
  95. db.session.commit()
  96. return self
  97. # -- coding: utf-8 --
  98. # @Time : 2023/2/28 22:22
  99. # @File : routes.py
  100. import json
  101. import traceback
  102. from flask import jsonify, make_response, request
  103. from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
  104. from flask_restful import Api, Resource, abort
  105. from app import db
  106. from app.rbac import rbac_bp
  107. from app.rbac.models import User, Role, Dept, Permission
  108. from app.rbac.schema import UserSchema, DeptSchema, RoleSchema, PermissionSchema
  109. from app.utils.rights import authorize
  110. from app.utils.responses import response_with
  111. from app.utils import responses as resp
  112. api = Api(rbac_bp)
  113. @rbac_bp.route("/init_db/")
  114. def init_db():
  115. # 创建数据库
  116. db.create_all()
  117. return response_with(resp.SUCCESS_200, message="init table success.")
  118. @rbac_bp.route('/login/', methods=["POST"])
  119. def login():
  120. """登录"""
  121. try:
  122. data = json.loads(request.get_data())
  123. user_obj = User.find_by_username(data['username'])
  124. if not user_obj:
  125. return response_with(resp.NOT_FOUND_HANDLER_404, message="user not found.")
  126. # 登录成功,需要 生成 jwt字符串
  127. if User.verify_hash(data['password'], user_obj.password_hash):
  128. # 用户权限
  129. user_permissions = []
  130. for role in user_obj.role:
  131. # 未启用
  132. if role.enable == 0:
  133. continue
  134. for p in role.permissions:
  135. if p.enable == 0:
  136. continue
  137. user_permissions.append(p.code)
  138. payload = {
  139. 'user_id': user_obj.id,
  140. 'username': user_obj.username,
  141. 'roles': [r.name for r in user_obj.role],
  142. 'permissions': user_permissions
  143. }
  144. print(f"payload: {
  145. payload}")
  146. """
  147. {'user_id': 3, 'username': 'admin', 'roles': ['管理员'],
  148. 'permissions': ['admin:user:main', 'admin:permission:main', 'admin:permission:add', 'admin:permission:edit',
  149. 'admin:permission:remove', 'admin:role:main', 'admin:role:add', 'admin:role:edit', 'admin:role:remove',
  150. 'admin:user:add', 'admin:user:edit', 'admin:user:remove', 'admin:dept:main', 'admin:dept:add',
  151. 'admin:dept:edit', 'admin:dept:remove']}
  152. """
  153. access_token = create_access_token(identity=payload)
  154. return response_with(resp.SUCCESS_201, message=f"{
  155. user_obj.username} login success.",
  156. value={
  157. "access_token": access_token})
  158. else:
  159. return response_with((resp.UNAUTHORIZED_401))
  160. except Exception as e:
  161. print(traceback.format_exc())
  162. return response_with(resp.INVALID_INPUT_422)
  163. class UserListResource(Resource):
  164. """
  165. 用户
  166. """
  167. @authorize("admin:user:main")
  168. def get(self):
  169. users = User.query.all()
  170. return UserSchema(many=True, only=("id", "username", "dept_id", "role")).dump(users)
  171. @authorize("admin:user:add")
  172. def post(self):
  173. json_data = json.loads(request.get_data())
  174. if not json_data:
  175. return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
  176. try:
  177. json_data['password_hash'] = User.generate_hash(json_data.pop('password', None))
  178. print(json_data)
  179. # user = user_schema.load(json_data)
  180. # user.create()
  181. user = User(**json_data)
  182. db.session.add(user)
  183. db.session.commit()
  184. except Exception as e:
  185. db.session.rollback()
  186. print(traceback.format_exc())
  187. return response_with(resp.BAD_REQUEST_400, message="Could not create user. Error: {}".format(str(e)))
  188. return response_with(resp.SUCCESS_201, message="User create success.")
  189. class DeptListResource(Resource):
  190. """
  191. 部门
  192. """
  193. @authorize("admin:dept:main")
  194. def get(self):
  195. depts = Dept.query.all()
  196. return DeptSchema(many=True, only=("id", "dept_name")).dump(depts)
  197. @authorize("admin:dept:add")
  198. def post(self):
  199. json_data = json.loads(request.get_data())
  200. if not json_data:
  201. return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
  202. try:
  203. dept = DeptSchema().load(json_data)
  204. print(json_data)
  205. print(dept)
  206. dept.data.create()
  207. except Exception as e:
  208. db.session.rollback()
  209. print(traceback.format_exc())
  210. return response_with(resp.BAD_REQUEST_400, message="Could not create dept. Error: {}".format(str(e)))
  211. return response_with(resp.SUCCESS_201, message="Dept create success.")
  212. class PermissionListResource(Resource):
  213. """
  214. 权限
  215. """
  216. @authorize("admin:permission:main")
  217. def get(self):
  218. permissions = Permission.query.all()
  219. return PermissionSchema(many=True,
  220. only=("id", "name", "code", "type", "url", "enable", "open_type", "parent_id")).dump(
  221. permissions)
  222. @authorize("admin:permission:add")
  223. def post(self):
  224. json_data = json.loads(request.get_data())
  225. if not json_data:
  226. return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
  227. try:
  228. permission = PermissionSchema().load(json_data)
  229. print(json_data)
  230. print(permission)
  231. permission.data.create()
  232. except Exception as e:
  233. db.session.rollback()
  234. print(traceback.format_exc())
  235. return response_with(resp.BAD_REQUEST_400, message="Could not create permission. Error: {}".format(str(e)))
  236. return response_with(resp.SUCCESS_201, message="Permission create success.")
  237. class RoleListResource(Resource):
  238. """
  239. 角色视图:获取所有角色、创建角色
  240. """
  241. # @authorize("admin:role:main")
  242. def get(self):
  243. roles = Role.query.all()
  244. print(f"roles: {
  245. roles}")
  246. return RoleSchema(many=True).dump(roles)
  247. @authorize("admin:role:add")
  248. def post(self):
  249. args = request.get_data()
  250. if not args:
  251. return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
  252. try:
  253. json_data = json.loads(args)
  254. role = RoleSchema().load(json_data)
  255. print(json_data)
  256. print(role)
  257. role.data.create()
  258. except Exception as e:
  259. db.session.rollback()
  260. print(traceback.format_exc())
  261. abort(400, message="Could not create permission. Error: {}".format(str(e)))
  262. return response_with(resp.SUCCESS_201, message="Role create success.")
  263. class RoleResource(Resource):
  264. """
  265. 角色视图:获取角色详情、修改角色、删除角色
  266. 特征:单独操作某一个角色
  267. """
  268. @authorize("admin:role:main")
  269. def get(self, role_id):
  270. """单独获取某个角色"""
  271. role = Role.query.get(role_id)
  272. if not role:
  273. return response_with(resp.NOT_FOUND_HANDLER_404, message="Role not found.")
  274. return RoleSchema().dump(role)
  275. @authorize("admin:role:edit")
  276. def put(self, role_id):
  277. """修改角色"""
  278. role = Role.query.get(role_id)
  279. if not role:
  280. return response_with(resp.NOT_FOUND_HANDLER_404, message="Role not found.")
  281. args = request.get_data()
  282. if not args:
  283. return response_with(resp.BAD_REQUEST_400, message="No input data provided.")
  284. try:
  285. json_data = json.loads(args)
  286. role.name = json_data.get("name", "")
  287. role.details = json_data.get("details", "")
  288. role.enable = json_data.get("enable", 1)
  289. db.session.add(role)
  290. db.session.commit()
  291. except Exception as e:
  292. print(traceback.format_exc())
  293. db.session.rollback()
  294. return response_with(resp.BAD_REQUEST_400, message="Could not update role. Error: {}".format(str(e)))
  295. return RoleSchema().dump(role)
  296. @authorize("admin:role:remove")
  297. def delete(self, role_id):
  298. """删除角色"""
  299. role = Role.query.get(role_id)
  300. if not role:
  301. return response_with(resp.NOT_FOUND_HANDLER_404, message="Role not found.")
  302. try:
  303. db.session.delete(role)
  304. db.session.commit()
  305. except Exception as e:
  306. db.session.rollback()
  307. abort(400, message="Could not delete role. Error: {}".format(str(e)))
  308. return response_with(resp.SUCCESS_204, message="Role deleted successfully.")
  309. api.add_resource(UserListResource, '/users')
  310. api.add_resource(DeptListResource, '/depts')
  311. api.add_resource(PermissionListResource, '/permissions')
  312. api.add_resource(RoleListResource, '/roles')
  313. api.add_resource(RoleResource, '/roles/<int:role_id>')
  314. # -- coding: utf-8 --
  315. # @Time : 2023/3/11 23:10
  316. # @Author : hj
  317. # @File : rights.py
  318. import traceback
  319. from functools import wraps
  320. from flask_jwt_extended import get_jwt_identity, get_jwt, verify_jwt_in_request
  321. from flask import request, jsonify
  322. from app.utils.responses import response_with
  323. from app.utils import responses as resp
  324. def authorize(permission_name: str):
  325. """
  326. 权限校验装饰器
  327. :param permission_name:
  328. :return:
  329. """
  330. def decorator(func):
  331. @wraps(func)
  332. def wrapper(*args, **kwargs):
  333. # 获取 Authorization 头
  334. auth_header = request.headers.get('Authorization', "")
  335. print(auth_header)
  336. if not auth_header:
  337. return response_with(resp.UNAUTHORIZED_401)
  338. # 解析 jwt
  339. try:
  340. # 验证请求中存在有效的JWT,除非' ' optional=True ' '
  341. verify_jwt_in_request()
  342. token = auth_header.split()[1]
  343. # 解析 jwt
  344. user_info = get_jwt_identity()
  345. print(f"user_info: {
  346. user_info}")
  347. roles = user_info.get("roles", [])
  348. permissions = user_info.get("permissions", [])
  349. print(f"roles: {
  350. roles}")
  351. print(f"permissions: {
  352. permissions}")
  353. # 管理员
  354. if "管理员" in roles:
  355. return func(*args, **kwargs)
  356. # 检查权限
  357. if permission_name not in permissions:
  358. return response_with(resp.FORBIDDEN_403)
  359. except:
  360. print(traceback.format_exc())
  361. return {
  362. "msg": "Invalid token"}, 401
  363. return func(*args, **kwargs)
  364. return wrapper
  365. return decorator

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,136人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RBAC权限管理

    RBAC(Role-Based Access Control,基于角色的访问控制),就是用户通过角色与权限进行关联。简单地说,一个用户拥有若干角色,每一个角色拥有若干权限。这样

    相关 RBAC权限管理系统

    RBAC权限管理系统 RBAC权限控制机制 权限所要控制的资源类别是根据应用系统的需要而定义的,具有的语义和控制规则也是应用系统提供的,对于权限管理系统来说是透明的,权限将

    相关 RBAC权限管理

    1. 简介 RBAC是Role-Based Access Control的首字母,即基于角色的访问控制,是最简单的权限管理解决方案。它对权限的控制精度一般为节点。 基本

    相关 RBAC权限管理

    RBAC(Role-Based Access Controller,基于角色的访问控制), 概念: 就是用户通过角色与权限进行相关联; 模型: “用户—>角色—>权限”;