import jwt from flask import request, jsonify from typing import Optional, Dict from apis import api_user # TODO: Replace with your own secret key ACCESS_TOKEN_SECRET = 'access_token_secret' REFRESH_TOKEN_SECRET = 'refresh_token_secret' # 模拟用户数据 MOCK_USERS = [ { "username": "test_user", "password": "test_password" } ] def generate_access_token(user: Dict) -> str: """ 生成访问令牌 :param user: 用户信息 :return: 访问令牌 """ return jwt.encode(user, ACCESS_TOKEN_SECRET, algorithm='HS256') def generate_refresh_token(user: Dict) -> str: """ 生成刷新令牌 :param user: 用户信息 :return: 刷新令牌 """ return jwt.encode(user, REFRESH_TOKEN_SECRET, algorithm='HS256') def verify_access_token() -> Optional[Dict]: """ 验证访问令牌 :return: 用户信息(不包含密码)或 None """ auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return None token = auth_header.split(' ')[1] try: decoded = jwt.decode(token, ACCESS_TOKEN_SECRET, algorithms=['HS256']) username = decoded.get('username') user = next((item for item in MOCK_USERS if item['username'] == username), None) if user: userinfo = {k: v for k, v in user.items() if k != 'password'} return userinfo return None except jwt.exceptions.InvalidTokenError: return None def verify_refresh_token(token: str) -> Optional[Dict]: """ 验证刷新令牌 :param token: 刷新令牌 :return: 用户信息(不包含密码)或 None """ try: decoded = jwt.decode(token, REFRESH_TOKEN_SECRET, algorithms=['HS256']) username = decoded.get('username') user = next((item for item in MOCK_USERS if item['username'] == username), None) if user: userinfo = {k: v for k, v in user.items() if k != 'password'} return userinfo return None except jwt.exceptions.InvalidTokenError: return None ##======================================== from flask import jsonify def use_response_success(data=None): """ 成功响应 :param data: 响应数据 :return: 成功响应 JSON """ return { "code": 0, "data": data, "error": None, "message": "ok" } def use_page_response_success(page, page_size, list_data, message="ok"): """ 分页成功响应 :param page: 页码 :param page_size: 每页数量 :param list_data: 数据列表 :param message: 消息 :return: 分页成功响应 JSON """ page = int(page) page_size = int(page_size) offset = (page - 1) * page_size page_data = list_data[offset:offset + page_size] return { **use_response_success({ "items": page_data, "total": len(list_data) }), "message": message } def use_response_error(message, error=None): """ 错误响应 :param message: 错误消息 :param error: 错误信息 :return: 错误响应 JSON """ return { "code": -1, "data": None, "error": error, "message": message } def forbidden_response(message="Forbidden Exception"): """ 禁止访问响应 :param message: 错误消息 :return: 禁止访问响应 JSON """ return jsonify(use_response_error(message, message)), 403 def unAuthorizedResponse(): """ 未授权响应 :return: 未授权响应 JSON """ return jsonify(use_response_error("Unauthorized Exception", "Unauthorized Exception")), 401 def sleep(ms): """ 睡眠函数 :param ms: 睡眠时间(毫秒) :return: 无 """ import time time.sleep(ms / 1000) def pagination(page_no, page_size, array): """ 分页函数 :param page_no: 页码 :param page_size: 每页数量 :param array: 数据列表 :return: 分页后的数据列表 """ page_no = int(page_no) page_size = int(page_size) offset = (page_no - 1) * page_size if offset + page_size >= len(array): return array[offset:] return array[offset:offset + page_size] ##======================================== def info(): """ 获取用户信息 :return: 用户信息或未授权响应 """ userinfo = verify_access_token() if not userinfo: return jsonify({ "code": -1, "data": None, "error": "Unauthorized Exception", "message": "Unauthorized Exception" }), 401 return jsonify({ "code": 0, "data": userinfo, "error": None, "message": "ok" }) @api_user.route('/info', methods=['GET']) def get_info(): return info()