commit 9f27ebdea1033c869dda7a46582ef1ecc320a639 Author: 萌狼蓝天 Date: Thu Feb 20 17:50:47 2025 +0800 first commit diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/MarsCodeWorkspaceAppSettings.xml b/.idea/MarsCodeWorkspaceAppSettings.xml new file mode 100644 index 0000000..05ed8ba --- /dev/null +++ b/.idea/MarsCodeWorkspaceAppSettings.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..051acb4 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,88 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/libroro-server.iml b/.idea/libroro-server.iml new file mode 100644 index 0000000..43916a1 --- /dev/null +++ b/.idea/libroro-server.iml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..0d614f4 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..1a3563e --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/READNE.md b/READNE.md new file mode 100644 index 0000000..58458ed --- /dev/null +++ b/READNE.md @@ -0,0 +1,3 @@ +接口文档 + +pip install --upgrade \ No newline at end of file diff --git a/__pycache__/app.cpython-311.pyc b/__pycache__/app.cpython-311.pyc new file mode 100644 index 0000000..437f084 Binary files /dev/null and b/__pycache__/app.cpython-311.pyc differ diff --git a/__pycache__/test.cpython-311.pyc b/__pycache__/test.cpython-311.pyc new file mode 100644 index 0000000..f3985bd Binary files /dev/null and b/__pycache__/test.cpython-311.pyc differ diff --git a/apis/__init__.py b/apis/__init__.py new file mode 100644 index 0000000..e267c18 --- /dev/null +++ b/apis/__init__.py @@ -0,0 +1,9 @@ +from flask import Blueprint +# 系统相关接口 例如对系统的一些设置 +api_system = Blueprint('system', __name__) +# 权限相关接口 例如菜单 +api_auth = Blueprint('auth', __name__) +# 用户相关接口 例如用户信息 +api_user = Blueprint('user', __name__) +# 其他一些不便于分类的接口 +api_other = Blueprint('other', __name__) \ No newline at end of file diff --git a/apis/__pycache__/__init__.cpython-311.pyc b/apis/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..ae423cf Binary files /dev/null and b/apis/__pycache__/__init__.cpython-311.pyc differ diff --git a/apis/__pycache__/api_user.cpython-311.pyc b/apis/__pycache__/api_user.cpython-311.pyc new file mode 100644 index 0000000..5a0b464 Binary files /dev/null and b/apis/__pycache__/api_user.cpython-311.pyc differ diff --git a/apis/api_user.py b/apis/api_user.py new file mode 100644 index 0000000..b3dca92 --- /dev/null +++ b/apis/api_user.py @@ -0,0 +1,201 @@ +import jwt +from flask import request, jsonify +from typing import Optional, Dict + +from apis import api_user + +# TODO: Replace with your own secret key +ACCESS_TOKEN_SECRET = 'access_token_secret' +REFRESH_TOKEN_SECRET = 'refresh_token_secret' + +# 模拟用户数据 +MOCK_USERS = [ + { + "username": "test_user", + "password": "test_password" + } +] + + +def generate_access_token(user: Dict) -> str: + """ + 生成访问令牌 + :param user: 用户信息 + :return: 访问令牌 + """ + return jwt.encode(user, ACCESS_TOKEN_SECRET, algorithm='HS256') + + +def generate_refresh_token(user: Dict) -> str: + """ + 生成刷新令牌 + :param user: 用户信息 + :return: 刷新令牌 + """ + return jwt.encode(user, REFRESH_TOKEN_SECRET, algorithm='HS256') + + +def verify_access_token() -> Optional[Dict]: + """ + 验证访问令牌 + :return: 用户信息(不包含密码)或 None + """ + auth_header = request.headers.get('Authorization') + if not auth_header or not auth_header.startswith('Bearer '): + return None + + token = auth_header.split(' ')[1] + try: + decoded = jwt.decode(token, ACCESS_TOKEN_SECRET, algorithms=['HS256']) + username = decoded.get('username') + user = next((item for item in MOCK_USERS if item['username'] == username), None) + if user: + userinfo = {k: v for k, v in user.items() if k != 'password'} + return userinfo + return None + except jwt.exceptions.InvalidTokenError: + return None + + +def verify_refresh_token(token: str) -> Optional[Dict]: + """ + 验证刷新令牌 + :param token: 刷新令牌 + :return: 用户信息(不包含密码)或 None + """ + try: + decoded = jwt.decode(token, REFRESH_TOKEN_SECRET, algorithms=['HS256']) + username = decoded.get('username') + user = next((item for item in MOCK_USERS if item['username'] == username), None) + if user: + userinfo = {k: v for k, v in user.items() if k != 'password'} + return userinfo + return None + except jwt.exceptions.InvalidTokenError: + return None + + +##======================================== +from flask import jsonify + + +def use_response_success(data=None): + """ + 成功响应 + :param data: 响应数据 + :return: 成功响应 JSON + """ + return { + "code": 0, + "data": data, + "error": None, + "message": "ok" + } + + +def use_page_response_success(page, page_size, list_data, message="ok"): + """ + 分页成功响应 + :param page: 页码 + :param page_size: 每页数量 + :param list_data: 数据列表 + :param message: 消息 + :return: 分页成功响应 JSON + """ + page = int(page) + page_size = int(page_size) + offset = (page - 1) * page_size + page_data = list_data[offset:offset + page_size] + return { + **use_response_success({ + "items": page_data, + "total": len(list_data) + }), + "message": message + } + + +def use_response_error(message, error=None): + """ + 错误响应 + :param message: 错误消息 + :param error: 错误信息 + :return: 错误响应 JSON + """ + return { + "code": -1, + "data": None, + "error": error, + "message": message + } + + +def forbidden_response(message="Forbidden Exception"): + """ + 禁止访问响应 + :param message: 错误消息 + :return: 禁止访问响应 JSON + """ + return jsonify(use_response_error(message, message)), 403 + + +def unAuthorizedResponse(): + """ + 未授权响应 + :return: 未授权响应 JSON + """ + return jsonify(use_response_error("Unauthorized Exception", "Unauthorized Exception")), 401 + + +def sleep(ms): + """ + 睡眠函数 + :param ms: 睡眠时间(毫秒) + :return: 无 + """ + import time + time.sleep(ms / 1000) + + +def pagination(page_no, page_size, array): + """ + 分页函数 + :param page_no: 页码 + :param page_size: 每页数量 + :param array: 数据列表 + :return: 分页后的数据列表 + """ + page_no = int(page_no) + page_size = int(page_size) + offset = (page_no - 1) * page_size + if offset + page_size >= len(array): + return array[offset:] + return array[offset:offset + page_size] + + +##======================================== + +def info(): + """ + 获取用户信息 + :return: 用户信息或未授权响应 + """ + userinfo = verify_access_token() + if not userinfo: + return jsonify({ + "code": -1, + "data": None, + "error": "Unauthorized Exception", + "message": "Unauthorized Exception" + }), 401 + return jsonify({ + "code": 0, + "data": userinfo, + "error": None, + "message": "ok" + }) + + +@api_user.route('/info', methods=['GET']) +def get_info(): + return info() diff --git a/app.py b/app.py new file mode 100644 index 0000000..98a465a --- /dev/null +++ b/app.py @@ -0,0 +1,30 @@ +from flask import Flask, jsonify + +from apis import api_system, api_auth, api_user, api_other +from apis.api_user import use_response_error + +app = Flask(__name__) + + + +# 注册蓝图 +app.register_blueprint(api_system, url_prefix='/system') +app.register_blueprint(api_auth, url_prefix='/auth') +app.register_blueprint(api_user, url_prefix='/api/user') +app.register_blueprint(api_other, url_prefix='/other') +import logging + +# 配置日志记录 +logging.basicConfig(level=logging.DEBUG) + +@app.errorhandler(404) +def page_not_found(e): + logging.error(f"404 Error: {e}") + return jsonify(use_response_error("Page not found", str(e))), 404 + +@app.errorhandler(Exception) +def internal_server_error(e): + logging.error(f"Internal Server Error: {e}", exc_info=True) + return jsonify(use_response_error("Internal Server Error", str(e))), 500 +if __name__ == '__main__': + app.run(host="0.0.0.0",port=5566,debug=True) \ No newline at end of file diff --git a/database/__init__.py b/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/old.utils/__init__.py b/old.utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/old.utils/cookie_utils.py b/old.utils/cookie_utils.py new file mode 100644 index 0000000..1312237 --- /dev/null +++ b/old.utils/cookie_utils.py @@ -0,0 +1,14 @@ +from flask import request, make_response + +def clear_refresh_token_cookie(): + resp = make_response() + resp.delete_cookie('jwt', httponly=True, samesite='None', secure=True) + return resp + +def set_refresh_token_cookie(refresh_token): + resp = make_response() + resp.set_cookie('jwt', refresh_token, httponly=True, max_age=24 * 60 * 60 * 1000, samesite='None', secure=True) + return resp + +def get_refresh_token_from_cookie(): + return request.cookies.get('jwt') \ No newline at end of file diff --git a/old.utils/jwt_utils.py b/old.utils/jwt_utils.py new file mode 100644 index 0000000..8e2abd6 --- /dev/null +++ b/old.utils/jwt_utils.py @@ -0,0 +1,52 @@ +import jwt +from datetime import datetime, timedelta +from flask import request +from mock_data import MOCK_USERS, UserInfo + +# TODO: Replace with your own secret key +ACCESS_TOKEN_SECRET = 'access_token_secret' +REFRESH_TOKEN_SECRET = 'refresh_token_secret' + +def generate_access_token(user: UserInfo): + payload = { + 'user': user, + 'iat': datetime.utcnow(), + 'exp': datetime.utcnow() + timedelta(days=7) + } + return jwt.encode(payload, ACCESS_TOKEN_SECRET, algorithm='HS256') + +def generate_refresh_token(user: UserInfo): + payload = { + 'user': user, + 'iat': datetime.utcnow(), + 'exp': datetime.utcnow() + timedelta(days=30) + } + return jwt.encode(payload, REFRESH_TOKEN_SECRET, algorithm='HS256') + +def verify_access_token(): + auth_header = request.headers.get('Authorization') + if not auth_header or not auth_header.startswith('Bearer '): + return None + token = auth_header.split(' ')[1] + try: + decoded = jwt.decode(token, ACCESS_TOKEN_SECRET, algorithms=['HS256']) + username = decoded['user']['username'] + user = next((u for u in MOCK_USERS if u['username'] == username), None) + if user: + user_info = {k: v for k, v in user.items() if k != 'password'} + return user_info + return None + except jwt.PyJWTError: + return None + +def verify_refresh_token(token): + try: + decoded = jwt.decode(token, REFRESH_TOKEN_SECRET, algorithms=['HS256']) + username = decoded['user']['username'] + user = next((u for u in MOCK_USERS if u['username'] == username), None) + if user: + user_info = {k: v for k, v in user.items() if k != 'password'} + return user_info + return None + except jwt.PyJWTError: + return None \ No newline at end of file diff --git a/old.utils/response_utils.py b/old.utils/response_utils.py new file mode 100644 index 0000000..423c3f4 --- /dev/null +++ b/old.utils/response_utils.py @@ -0,0 +1,42 @@ +from flask import jsonify + +def use_response_success(data=None): + return jsonify({ + 'code': 0, + 'data': data, + 'error': None, + 'message': 'ok' + }) + +def use_page_response_success(page, page_size, list_data, message='ok'): + page = int(page) + page_size = int(page_size) + offset = (page - 1) * page_size + page_data = list_data[offset:offset + page_size] + return jsonify({ + 'code': 0, + 'data': { + 'items': page_data, + 'total': len(list_data) + }, + 'error': None, + 'message': message + }) + +def use_response_error(message, error=None): + return jsonify({ + 'code': -1, + 'data': None, + 'error': error, + 'message': message + }) + +def forbidden_response(message='Forbidden Exception'): + return use_response_error(message, message), 403 + +def un_authorized_response(): + return use_response_error('Unauthorized Exception', 'Unauthorized Exception'), 401 + +def sleep(ms): + import time + time.sleep(ms / 1000) \ No newline at end of file diff --git a/old_routes/__init__.py b/old_routes/__init__.py new file mode 100644 index 0000000..348889a --- /dev/null +++ b/old_routes/__init__.py @@ -0,0 +1,5 @@ +from flask import Blueprint + +auth_bp = Blueprint('auth', __name__) +other_bp = Blueprint('other', __name__) +user_bp = Blueprint('user', __name__) \ No newline at end of file diff --git a/old_routes/__pycache__/__init__.cpython-311.pyc b/old_routes/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..e465dd3 Binary files /dev/null and b/old_routes/__pycache__/__init__.cpython-311.pyc differ diff --git a/old_routes/auth_routes.py b/old_routes/auth_routes.py new file mode 100644 index 0000000..2d5c210 --- /dev/null +++ b/old_routes/auth_routes.py @@ -0,0 +1,52 @@ +from flask import Blueprint, request, jsonify + +from old_routes import auth_bp +from utils.cookie_utils import clear_refresh_token_cookie, set_refresh_token_cookie, get_refresh_token_from_cookie +from utils.jwt_utils import generate_access_token, generate_refresh_token, verify_access_token, verify_refresh_token +from utils.response_utils import use_response_success, use_response_error, forbidden_response, un_authorized_response +from mock_data import MOCK_USERS + + + +@auth_bp.route('/login', methods=['POST']) +def login(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + if not username or not password: + return use_response_error('BadRequestException', 'Username and password are required'), 400 + user = next((u for u in MOCK_USERS if u['username'] == username and u['password'] == password), None) + if not user: + clear_refresh_token_cookie() + return forbidden_response('Username or password is incorrect.') + access_token = generate_access_token(user) + refresh_token = generate_refresh_token(user) + set_refresh_token_cookie(refresh_token) + return use_response_success({ + **user, + 'access_token': access_token + }) + +@auth_bp.route('/refresh', methods=['POST']) +def refresh(): + refresh_token = get_refresh_token_from_cookie() + if not refresh_token: + return forbidden_response() + clear_refresh_token_cookie() + user_info = verify_refresh_token(refresh_token) + if not user_info: + return forbidden_response() + user = next((u for u in MOCK_USERS if u['username'] == user_info['username']), None) + if not user: + return forbidden_response() + access_token = generate_access_token(user) + set_refresh_token_cookie(refresh_token) + return jsonify({'access_token': access_token}) + +@auth_bp.route('/logout', methods=['POST']) +def logout(): + refresh_token = get_refresh_token_from_cookie() + if not refresh_token: + return use_response_success('') + clear_refresh_token_cookie() + return use_response_success('') \ No newline at end of file diff --git a/old_routes/mock_data.py b/old_routes/mock_data.py new file mode 100644 index 0000000..78682d0 --- /dev/null +++ b/old_routes/mock_data.py @@ -0,0 +1,181 @@ +from typing import List, Dict + +# 定义用户信息类型 +UserInfo = Dict[str, str | int | List[str]] + +# 模拟用户数据 +MOCK_USERS: List[UserInfo] = [ + { + 'id': 0, + 'password': '123456', + 'realName': 'Vben', + 'roles': ['super'], + 'username': 'vben' + }, + { + 'id': 1, + 'password': '123456', + 'realName': 'Admin', + 'roles': ['admin'], + 'username': 'admin', + 'homePath': '/workspace' + }, + { + 'id': 2, + 'password': '123456', + 'realName': 'Jack', + 'roles': ['user'], + 'username': 'jack', + 'homePath': '/analytics' + } +] + +# 模拟代码数据 +MOCK_CODES = [ + { + 'codes': ['AC_100100', 'AC_100110', 'AC_100120', 'AC_100010'], + 'username': 'vben' + }, + { + 'codes': ['AC_100010', 'AC_100020', 'AC_100030'], + 'username': 'admin' + }, + { + 'codes': ['AC_1000001', 'AC_1000002'], + 'username': 'jack' + } +] + +# 模拟菜单数据 +dashboard_menus = [ + { + 'meta': { + 'order': -1, + 'title': 'page.dashboard.title' + }, + 'name': 'Dashboard', + 'path': '/dashboard', + 'redirect': '/analytics', + 'children': [ + { + 'name': 'Analytics', + 'path': '/analytics', + 'component': '/dashboard/analytics/index', + 'meta': { + 'affixTab': True, + 'title': 'page.dashboard.analytics' + } + }, + { + 'name': 'Workspace', + 'path': '/workspace', + 'component': '/dashboard/workspace/index', + 'meta': { + 'title': 'page.dashboard.workspace' + } + } + ] + } +] + +def create_demos_menus(role: str): + role_with_menus = { + 'admin': { + 'component': '/demos/access/admin-visible', + 'meta': { + 'icon': 'mdi:button-cursor', + 'title': 'demos.access.adminVisible' + }, + 'name': 'AccessAdminVisibleDemo', + 'path': '/demos/access/admin-visible' + }, + 'super': { + 'component': '/demos/access/super-visible', + 'meta': { + 'icon': 'mdi:button-cursor', + 'title': 'demos.access.superVisible' + }, + 'name': 'AccessSuperVisibleDemo', + 'path': '/demos/access/super-visible' + }, + 'user': { + 'component': '/demos/access/user-visible', + 'meta': { + 'icon': 'mdi:button-cursor', + 'title': 'demos.access.userVisible' + }, + 'name': 'AccessUserVisibleDemo', + 'path': '/demos/access/user-visible' + } + } + return [ + { + 'meta': { + 'icon': 'ic:baseline-view-in-ar', + 'keepAlive': True, + 'order': 1000, + 'title': 'demos.title' + }, + 'name': 'Demos', + 'path': '/demos', + 'redirect': '/demos/access', + 'children': [ + { + 'name': 'AccessDemos', + 'path': '/demosaccess', + 'meta': { + 'icon': 'mdi:cloud-key-outline', + 'title': 'demos.access.backendPermissions' + }, + 'redirect': '/demos/access/page-control', + 'children': [ + { + 'name': 'AccessPageControlDemo', + 'path': '/demos/access/page-control', + 'component': '/demos/access/index', + 'meta': { + 'icon': 'mdi:page-previous-outline', + 'title': 'demos.access.pageAccess' + } + }, + { + 'name': 'AccessButtonControlDemo', + 'path': '/demos/access/button-control', + 'component': '/demos/access/button-control', + 'meta': { + 'icon': 'mdi:button-cursor', + 'title': 'demos.access.buttonControl' + } + }, + { + 'name': 'AccessMenuVisible403Demo', + 'path': '/demos/access/menu-visible-403', + 'component': '/demos/access/menu-visible-403', + 'meta': { + 'authority': ['no-body'], + 'icon': 'mdi:button-cursor', + 'menuVisibleWithForbidden': True, + 'title': 'demos.access.menuVisible403' + } + }, + role_with_menus[role] + ] + } + ] + } + ] + +MOCK_MENUS = [ + { + 'menus': [*dashboard_menus, *create_demos_menus('super')], + 'username': 'vben' + }, + { + 'menus': [*dashboard_menus, *create_demos_menus('admin')], + 'username': 'admin' + }, + { + 'menus': [*dashboard_menus, *create_demos_menus('user')], + 'username': 'jack' + } +] \ No newline at end of file diff --git a/old_routes/other_routes.py b/old_routes/other_routes.py new file mode 100644 index 0000000..9e11ecb --- /dev/null +++ b/old_routes/other_routes.py @@ -0,0 +1,82 @@ +from flask import Blueprint, request + +from old_routes import other_bp +from utils.jwt_utils import verify_access_token +from utils.response_utils import use_response_success, use_page_response_success, un_authorized_response +from mock_data import MOCK_CODES, MOCK_MENUS +from faker import Faker + + + +@other_bp.route('/all', methods=['GET']) +def get_all(): + user_info = verify_access_token() + if not user_info: + return un_authorized_response() + menus = next((m['menus'] for m in MOCK_MENUS if m['username'] == user_info['username']), []) + return use_response_success(menus) + +@other_bp.route('/codes', methods=['GET']) +def get_codes(): + user_info = verify_access_token() + if not user_info: + return un_authorized_response() + codes = next((c['codes'] for c in MOCK_CODES if c['username'] == user_info['username']), []) + return use_response_success(codes) + +@other_bp.route('/info', methods=['GET']) +def get_info(): + user_info = verify_access_token() + if not user_info: + return un_authorized_response() + return use_response_success(user_info) + +@other_bp.route('/list', methods=['GET']) +def get_list(): + user_info = verify_access_token() + if not user_info: + return un_authorized_response() + fake = Faker() + def generate_mock_data_list(count): + data_list = [] + for _ in range(count): + data_item = { + 'id': fake.uuid4(), + 'imageUrl': fake.image_url(), + 'imageUrl2': fake.image_url(), + 'open': fake.boolean(), + 'status': fake.random_element(['success', 'error', 'warning']), + 'productName': fake.word(), + 'price': fake.random_int(min=1, max=1000), + 'currency': fake.currency_code(), + 'quantity': fake.random_int(min=1, max=100), + 'available': fake.boolean(), + 'category': fake.word(), + 'releaseDate': fake.date_time_this_decade().strftime('%Y-%m-%d'), + 'rating': fake.random_float(min=1, max=5), + 'description': fake.sentence(), + 'weight': fake.random_float(min=0.1, max=10), + 'color': fake.color_name(), + 'inProduction': fake.boolean(), + 'tags': [fake.word() for _ in range(3)] + } + data_list.append(data_item) + return data_list + mock_data = generate_mock_data_list(100) + page = request.args.get('page', 1) + page_size = request.args.get('pageSize', 10) + sort_by = request.args.get('sortBy') + sort_order = request.args.get('sortOrder', 'asc') + list_data = mock_data.copy() + if sort_by and sort_by in list_data[0]: + if sort_order == 'asc': + if sort_by == 'price': + list_data.sort(key=lambda x: float(x[sort_by])) + else: + list_data.sort(key=lambda x: x[sort_by]) + else: + if sort_by == 'price': + list_data.sort(key=lambda x: float(x[sort_by]), reverse=True) + else: + list_data.sort(key=lambda x: x[sort_by], reverse=True) + return use_page_response_success(page, page_size, list_data) \ No newline at end of file diff --git a/old_routes/user_routes.py b/old_routes/user_routes.py new file mode 100644 index 0000000..4299a6d --- /dev/null +++ b/old_routes/user_routes.py @@ -0,0 +1,208 @@ +from flask import Blueprint, request, jsonify, make_response +import jwt +from datetime import datetime, timedelta +import os + +# 创建蓝图对象 +user_bp = Blueprint('user', __name__) + +# 模拟用户数据 +MOCK_USERS = [ + { + "username": "testuser", + "password": "testpassword" + } +] + +# 模拟代码数据 +MOCK_CODES = [ + { + "username": "testuser", + "codes": ["code1", "code2"] + } +] + + +@user_bp.route('/info', methods=['GET']) +def get_user_info(): + """获取用户信息接口""" + # 此接口暂时返回一个简单信息,可根据实际需求完善 + return jsonify({"message": "User info endpoint"}) + + +# 定义生成 access token 的函数 +def generate_access_token(user): + """生成 access token""" + # 设置 token 过期时间为 15 分钟 + expiration_time = datetime.utcnow() + timedelta(minutes=15) + # 生成 payload,包含用户信息和过期时间 + payload = { + 'username': user['username'], + 'exp': expiration_time + } + # 使用 jwt 库生成 token,密钥从环境变量中获取 + token = jwt.encode(payload, os.getenv('SECRET_KEY'), algorithm='HS256') + return token + + +# 定义生成 refresh token 的函数 +def generate_refresh_token(user): + """生成 refresh token""" + # 设置 refresh token 过期时间为 7 天 + expiration_time = datetime.utcnow() + timedelta(days=7) + # 生成 payload,包含用户信息和过期时间 + payload = { + 'username': user['username'], + 'exp': expiration_time + } + # 使用 jwt 库生成 token,密钥从环境变量中获取 + token = jwt.encode(payload, os.getenv('SECRET_KEY'), algorithm='HS256') + return token + + +# 定义验证 access token 的函数 +def verify_access_token(token): + """验证 access token""" + try: + # 解码 token,密钥从环境变量中获取,算法为 HS256 + data = jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=['HS256']) + # 从解码后的数据中获取用户名 + username = data.get('username') + # 在模拟用户数据中查找该用户 + user = next((user for user in MOCK_USERS if user['username'] == username), None) + return user + except jwt.ExpiredSignatureError: + return None + except jwt.InvalidTokenError: + return None + + +# 定义验证 refresh token 的函数 +def verify_refresh_token(token): + """验证 refresh token""" + try: + # 解码 token,密钥从环境变量中获取,算法为 HS256 + data = jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=['HS256']) + # 从解码后的数据中获取用户名 + username = data.get('username') + # 在模拟用户数据中查找该用户 + user = next((user for user in MOCK_USERS if user['username'] == username), None) + return user + except jwt.ExpiredSignatureError: + return None + except jwt.InvalidTokenError: + return None + + +# 登录接口 +@user_bp.route('/login', methods=['POST']) +def login(): + """用户登录接口""" + # 获取请求中的 JSON 数据 + data = request.get_json() + if not data: + # 如果请求中没有 JSON 数据,返回 400 错误 + return make_response(jsonify({"error": "No JSON data provided"}), 400) + + # 从 JSON 数据中获取用户名和密码 + username = data.get('username') + password = data.get('password') + + # 检查用户名和密码是否都存在 + if not username or not password: + # 如果不存在,返回 400 错误 + return make_response(jsonify({"error": "Username and password are required"}), 400) + + # 在模拟用户数据中查找该用户 + find_user = next((user for user in MOCK_USERS if user['username'] == username and user['password'] == password), None) + + if not find_user: + # 如果用户不存在,返回 403 错误 + return make_response(jsonify({"error": "Username or password is incorrect."}), 403) + + # 生成 access token 和 refresh token + access_token = generate_access_token(find_user) + refresh_token = generate_refresh_token(find_user) + + # 返回成功响应,包含用户信息和 access token + response_data = find_user.copy() # 复制 find_user 字典,避免修改原始数据 + response_data["access_token"] = access_token + response_data["refresh_token"] = refresh_token + + return jsonify(response_data) + + +# 获取代码接口 +@user_bp.route('/codes', methods=['GET']) +def get_codes(): + """获取用户代码接口""" + # 获取请求头中的 Authorization 字段 + auth_header = request.headers.get('Authorization') + if not auth_header: + # 如果没有提供 Authorization 字段,返回 401 错误 + return make_response(jsonify({"error": "Unauthorized"}), 401) + + # 提取 token + try: + token = auth_header.split(" ")[1] + except IndexError: + # 如果 Authorization 字段格式错误,返回 401 错误 + return make_response(jsonify({"error": "Unauthorized"}), 401) + + # 验证 access token + userinfo = verify_access_token(token) + if not userinfo: + # 如果验证失败,返回 401 错误 + return make_response(jsonify({"error": "Unauthorized"}), 401) + + # 在模拟代码数据中查找该用户的代码 + codes = next((item['codes'] for item in MOCK_CODES if item['username'] == userinfo['username']), []) + # 返回成功响应,包含代码数据 + return jsonify(codes) + + +# 刷新 token 接口 +@user_bp.route('/refresh', methods=['POST']) +def refresh_token(): + """刷新 token 接口""" + # 获取请求头中的 Authorization 字段 + auth_header = request.headers.get('Authorization') + if not auth_header: + # 如果没有提供 Authorization 字段,返回 403 错误 + return make_response(jsonify({"error": "Forbidden"}), 403) + + # 提取 refresh token + try: + refresh_token = auth_header.split(" ")[1] + except IndexError: + # 如果 Authorization 字段格式错误,返回 403 错误 + return make_response(jsonify({"error": "Forbidden"}), 403) + + if not refresh_token: + # 如果没有提供 refresh token,返回 403 错误 + return make_response(jsonify({"error": "Forbidden"}), 403) + + # 验证 refresh token + userinfo = verify_refresh_token(refresh_token) + if not userinfo: + # 如果验证失败,返回 403 错误 + return make_response(jsonify({"error": "Forbidden"}), 403) + + # 在模拟用户数据中查找该用户 + find_user = next((user for user in MOCK_USERS if user['username'] == userinfo['username']), None) + if not find_user: + # 如果用户不存在,返回 403 错误 + return make_response(jsonify({"error": "Forbidden"}), 403) + + # 生成新的 access token + access_token = generate_access_token(find_user) + # 返回新的 access token + return jsonify({"access_token": access_token}) + + +# 登出接口 +@user_bp.route('/logout', methods=['POST']) +def logout(): + """用户登出接口""" + # 这里简单返回成功响应,因为在 Python 中没有类似清除 cookie 的操作示例 + return jsonify({"message": "Logged out successfully"}) diff --git a/test.py b/test.py new file mode 100644 index 0000000..bf17ea3 --- /dev/null +++ b/test.py @@ -0,0 +1,25 @@ + +import socket + +def test_port_binding(host, port): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((host, port)) + print(f"成功绑定到 {host}:{port}") + except PermissionError: + print(f"绑定到 {host}:{port} 时权限不足") + except Exception as e: + print(f"绑定到 {host}:{port} 时发生错误: {e}") + +# 测试绑定 +test_port_binding("127.0.0.1", 3000) +test_port_binding("0.0.0.0", 3000) + + +from flask import Flask + +app = Flask(__name__) +if __name__ == '__main__': + app.run(host="127.0.0.1", port=3000) # HTTPS required for secure cookies + +