flask-libroro/apis/api_user.py
2025-02-20 17:50:47 +08:00

202 lines
4.8 KiB
Python

import jwt
from flask import request, jsonify
from typing import Optional, Dict
from apis import api_user
# TODO: Replace with your own secret key
ACCESS_TOKEN_SECRET = 'access_token_secret'
REFRESH_TOKEN_SECRET = 'refresh_token_secret'
# 模拟用户数据
MOCK_USERS = [
{
"username": "test_user",
"password": "test_password"
}
]
def generate_access_token(user: Dict) -> str:
"""
生成访问令牌
:param user: 用户信息
:return: 访问令牌
"""
return jwt.encode(user, ACCESS_TOKEN_SECRET, algorithm='HS256')
def generate_refresh_token(user: Dict) -> str:
"""
生成刷新令牌
:param user: 用户信息
:return: 刷新令牌
"""
return jwt.encode(user, REFRESH_TOKEN_SECRET, algorithm='HS256')
def verify_access_token() -> Optional[Dict]:
"""
验证访问令牌
:return: 用户信息(不包含密码)或 None
"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
try:
decoded = jwt.decode(token, ACCESS_TOKEN_SECRET, algorithms=['HS256'])
username = decoded.get('username')
user = next((item for item in MOCK_USERS if item['username'] == username), None)
if user:
userinfo = {k: v for k, v in user.items() if k != 'password'}
return userinfo
return None
except jwt.exceptions.InvalidTokenError:
return None
def verify_refresh_token(token: str) -> Optional[Dict]:
"""
验证刷新令牌
:param token: 刷新令牌
:return: 用户信息(不包含密码)或 None
"""
try:
decoded = jwt.decode(token, REFRESH_TOKEN_SECRET, algorithms=['HS256'])
username = decoded.get('username')
user = next((item for item in MOCK_USERS if item['username'] == username), None)
if user:
userinfo = {k: v for k, v in user.items() if k != 'password'}
return userinfo
return None
except jwt.exceptions.InvalidTokenError:
return None
##========================================
from flask import jsonify
def use_response_success(data=None):
"""
成功响应
:param data: 响应数据
:return: 成功响应 JSON
"""
return {
"code": 0,
"data": data,
"error": None,
"message": "ok"
}
def use_page_response_success(page, page_size, list_data, message="ok"):
"""
分页成功响应
:param page: 页码
:param page_size: 每页数量
:param list_data: 数据列表
:param message: 消息
:return: 分页成功响应 JSON
"""
page = int(page)
page_size = int(page_size)
offset = (page - 1) * page_size
page_data = list_data[offset:offset + page_size]
return {
**use_response_success({
"items": page_data,
"total": len(list_data)
}),
"message": message
}
def use_response_error(message, error=None):
"""
错误响应
:param message: 错误消息
:param error: 错误信息
:return: 错误响应 JSON
"""
return {
"code": -1,
"data": None,
"error": error,
"message": message
}
def forbidden_response(message="Forbidden Exception"):
"""
禁止访问响应
:param message: 错误消息
:return: 禁止访问响应 JSON
"""
return jsonify(use_response_error(message, message)), 403
def unAuthorizedResponse():
"""
未授权响应
:return: 未授权响应 JSON
"""
return jsonify(use_response_error("Unauthorized Exception", "Unauthorized Exception")), 401
def sleep(ms):
"""
睡眠函数
:param ms: 睡眠时间(毫秒)
:return: 无
"""
import time
time.sleep(ms / 1000)
def pagination(page_no, page_size, array):
"""
分页函数
:param page_no: 页码
:param page_size: 每页数量
:param array: 数据列表
:return: 分页后的数据列表
"""
page_no = int(page_no)
page_size = int(page_size)
offset = (page_no - 1) * page_size
if offset + page_size >= len(array):
return array[offset:]
return array[offset:offset + page_size]
##========================================
def info():
"""
获取用户信息
:return: 用户信息或未授权响应
"""
userinfo = verify_access_token()
if not userinfo:
return jsonify({
"code": -1,
"data": None,
"error": "Unauthorized Exception",
"message": "Unauthorized Exception"
}), 401
return jsonify({
"code": 0,
"data": userinfo,
"error": None,
"message": "ok"
})
@api_user.route('/info', methods=['GET'])
def get_info():
return info()