flask-libroro/old_routes/user_routes.py

209 lines
7.0 KiB
Python
Raw Normal View History

2025-02-20 09:50:47 +00:00
from flask import Blueprint, request, jsonify, make_response
import jwt
from datetime import datetime, timedelta
import os
# 创建蓝图对象
user_bp = Blueprint('user', __name__)
# 模拟用户数据
MOCK_USERS = [
{
"username": "testuser",
"password": "testpassword"
}
]
# 模拟代码数据
MOCK_CODES = [
{
"username": "testuser",
"codes": ["code1", "code2"]
}
]
@user_bp.route('/info', methods=['GET'])
def get_user_info():
"""获取用户信息接口"""
# 此接口暂时返回一个简单信息,可根据实际需求完善
return jsonify({"message": "User info endpoint"})
# 定义生成 access token 的函数
def generate_access_token(user):
"""生成 access token"""
# 设置 token 过期时间为 15 分钟
expiration_time = datetime.utcnow() + timedelta(minutes=15)
# 生成 payload包含用户信息和过期时间
payload = {
'username': user['username'],
'exp': expiration_time
}
# 使用 jwt 库生成 token密钥从环境变量中获取
token = jwt.encode(payload, os.getenv('SECRET_KEY'), algorithm='HS256')
return token
# 定义生成 refresh token 的函数
def generate_refresh_token(user):
"""生成 refresh token"""
# 设置 refresh token 过期时间为 7 天
expiration_time = datetime.utcnow() + timedelta(days=7)
# 生成 payload包含用户信息和过期时间
payload = {
'username': user['username'],
'exp': expiration_time
}
# 使用 jwt 库生成 token密钥从环境变量中获取
token = jwt.encode(payload, os.getenv('SECRET_KEY'), algorithm='HS256')
return token
# 定义验证 access token 的函数
def verify_access_token(token):
"""验证 access token"""
try:
# 解码 token密钥从环境变量中获取算法为 HS256
data = jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=['HS256'])
# 从解码后的数据中获取用户名
username = data.get('username')
# 在模拟用户数据中查找该用户
user = next((user for user in MOCK_USERS if user['username'] == username), None)
return user
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# 定义验证 refresh token 的函数
def verify_refresh_token(token):
"""验证 refresh token"""
try:
# 解码 token密钥从环境变量中获取算法为 HS256
data = jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=['HS256'])
# 从解码后的数据中获取用户名
username = data.get('username')
# 在模拟用户数据中查找该用户
user = next((user for user in MOCK_USERS if user['username'] == username), None)
return user
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# 登录接口
@user_bp.route('/login', methods=['POST'])
def login():
"""用户登录接口"""
# 获取请求中的 JSON 数据
data = request.get_json()
if not data:
# 如果请求中没有 JSON 数据,返回 400 错误
return make_response(jsonify({"error": "No JSON data provided"}), 400)
# 从 JSON 数据中获取用户名和密码
username = data.get('username')
password = data.get('password')
# 检查用户名和密码是否都存在
if not username or not password:
# 如果不存在,返回 400 错误
return make_response(jsonify({"error": "Username and password are required"}), 400)
# 在模拟用户数据中查找该用户
find_user = next((user for user in MOCK_USERS if user['username'] == username and user['password'] == password), None)
if not find_user:
# 如果用户不存在,返回 403 错误
return make_response(jsonify({"error": "Username or password is incorrect."}), 403)
# 生成 access token 和 refresh token
access_token = generate_access_token(find_user)
refresh_token = generate_refresh_token(find_user)
# 返回成功响应,包含用户信息和 access token
response_data = find_user.copy() # 复制 find_user 字典,避免修改原始数据
response_data["access_token"] = access_token
response_data["refresh_token"] = refresh_token
return jsonify(response_data)
# 获取代码接口
@user_bp.route('/codes', methods=['GET'])
def get_codes():
"""获取用户代码接口"""
# 获取请求头中的 Authorization 字段
auth_header = request.headers.get('Authorization')
if not auth_header:
# 如果没有提供 Authorization 字段,返回 401 错误
return make_response(jsonify({"error": "Unauthorized"}), 401)
# 提取 token
try:
token = auth_header.split(" ")[1]
except IndexError:
# 如果 Authorization 字段格式错误,返回 401 错误
return make_response(jsonify({"error": "Unauthorized"}), 401)
# 验证 access token
userinfo = verify_access_token(token)
if not userinfo:
# 如果验证失败,返回 401 错误
return make_response(jsonify({"error": "Unauthorized"}), 401)
# 在模拟代码数据中查找该用户的代码
codes = next((item['codes'] for item in MOCK_CODES if item['username'] == userinfo['username']), [])
# 返回成功响应,包含代码数据
return jsonify(codes)
# 刷新 token 接口
@user_bp.route('/refresh', methods=['POST'])
def refresh_token():
"""刷新 token 接口"""
# 获取请求头中的 Authorization 字段
auth_header = request.headers.get('Authorization')
if not auth_header:
# 如果没有提供 Authorization 字段,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 提取 refresh token
try:
refresh_token = auth_header.split(" ")[1]
except IndexError:
# 如果 Authorization 字段格式错误,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
if not refresh_token:
# 如果没有提供 refresh token返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 验证 refresh token
userinfo = verify_refresh_token(refresh_token)
if not userinfo:
# 如果验证失败,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 在模拟用户数据中查找该用户
find_user = next((user for user in MOCK_USERS if user['username'] == userinfo['username']), None)
if not find_user:
# 如果用户不存在,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 生成新的 access token
access_token = generate_access_token(find_user)
# 返回新的 access token
return jsonify({"access_token": access_token})
# 登出接口
@user_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出接口"""
# 这里简单返回成功响应,因为在 Python 中没有类似清除 cookie 的操作示例
return jsonify({"message": "Logged out successfully"})