flask-libroro/old_routes/user_routes.py
2025-02-20 17:50:47 +08:00

209 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify, make_response
import jwt
from datetime import datetime, timedelta
import os
# 创建蓝图对象
user_bp = Blueprint('user', __name__)
# 模拟用户数据
MOCK_USERS = [
{
"username": "testuser",
"password": "testpassword"
}
]
# 模拟代码数据
MOCK_CODES = [
{
"username": "testuser",
"codes": ["code1", "code2"]
}
]
@user_bp.route('/info', methods=['GET'])
def get_user_info():
"""获取用户信息接口"""
# 此接口暂时返回一个简单信息,可根据实际需求完善
return jsonify({"message": "User info endpoint"})
# 定义生成 access token 的函数
def generate_access_token(user):
"""生成 access token"""
# 设置 token 过期时间为 15 分钟
expiration_time = datetime.utcnow() + timedelta(minutes=15)
# 生成 payload包含用户信息和过期时间
payload = {
'username': user['username'],
'exp': expiration_time
}
# 使用 jwt 库生成 token密钥从环境变量中获取
token = jwt.encode(payload, os.getenv('SECRET_KEY'), algorithm='HS256')
return token
# 定义生成 refresh token 的函数
def generate_refresh_token(user):
"""生成 refresh token"""
# 设置 refresh token 过期时间为 7 天
expiration_time = datetime.utcnow() + timedelta(days=7)
# 生成 payload包含用户信息和过期时间
payload = {
'username': user['username'],
'exp': expiration_time
}
# 使用 jwt 库生成 token密钥从环境变量中获取
token = jwt.encode(payload, os.getenv('SECRET_KEY'), algorithm='HS256')
return token
# 定义验证 access token 的函数
def verify_access_token(token):
"""验证 access token"""
try:
# 解码 token密钥从环境变量中获取算法为 HS256
data = jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=['HS256'])
# 从解码后的数据中获取用户名
username = data.get('username')
# 在模拟用户数据中查找该用户
user = next((user for user in MOCK_USERS if user['username'] == username), None)
return user
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# 定义验证 refresh token 的函数
def verify_refresh_token(token):
"""验证 refresh token"""
try:
# 解码 token密钥从环境变量中获取算法为 HS256
data = jwt.decode(token, os.getenv('SECRET_KEY'), algorithms=['HS256'])
# 从解码后的数据中获取用户名
username = data.get('username')
# 在模拟用户数据中查找该用户
user = next((user for user in MOCK_USERS if user['username'] == username), None)
return user
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# 登录接口
@user_bp.route('/login', methods=['POST'])
def login():
"""用户登录接口"""
# 获取请求中的 JSON 数据
data = request.get_json()
if not data:
# 如果请求中没有 JSON 数据,返回 400 错误
return make_response(jsonify({"error": "No JSON data provided"}), 400)
# 从 JSON 数据中获取用户名和密码
username = data.get('username')
password = data.get('password')
# 检查用户名和密码是否都存在
if not username or not password:
# 如果不存在,返回 400 错误
return make_response(jsonify({"error": "Username and password are required"}), 400)
# 在模拟用户数据中查找该用户
find_user = next((user for user in MOCK_USERS if user['username'] == username and user['password'] == password), None)
if not find_user:
# 如果用户不存在,返回 403 错误
return make_response(jsonify({"error": "Username or password is incorrect."}), 403)
# 生成 access token 和 refresh token
access_token = generate_access_token(find_user)
refresh_token = generate_refresh_token(find_user)
# 返回成功响应,包含用户信息和 access token
response_data = find_user.copy() # 复制 find_user 字典,避免修改原始数据
response_data["access_token"] = access_token
response_data["refresh_token"] = refresh_token
return jsonify(response_data)
# 获取代码接口
@user_bp.route('/codes', methods=['GET'])
def get_codes():
"""获取用户代码接口"""
# 获取请求头中的 Authorization 字段
auth_header = request.headers.get('Authorization')
if not auth_header:
# 如果没有提供 Authorization 字段,返回 401 错误
return make_response(jsonify({"error": "Unauthorized"}), 401)
# 提取 token
try:
token = auth_header.split(" ")[1]
except IndexError:
# 如果 Authorization 字段格式错误,返回 401 错误
return make_response(jsonify({"error": "Unauthorized"}), 401)
# 验证 access token
userinfo = verify_access_token(token)
if not userinfo:
# 如果验证失败,返回 401 错误
return make_response(jsonify({"error": "Unauthorized"}), 401)
# 在模拟代码数据中查找该用户的代码
codes = next((item['codes'] for item in MOCK_CODES if item['username'] == userinfo['username']), [])
# 返回成功响应,包含代码数据
return jsonify(codes)
# 刷新 token 接口
@user_bp.route('/refresh', methods=['POST'])
def refresh_token():
"""刷新 token 接口"""
# 获取请求头中的 Authorization 字段
auth_header = request.headers.get('Authorization')
if not auth_header:
# 如果没有提供 Authorization 字段,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 提取 refresh token
try:
refresh_token = auth_header.split(" ")[1]
except IndexError:
# 如果 Authorization 字段格式错误,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
if not refresh_token:
# 如果没有提供 refresh token返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 验证 refresh token
userinfo = verify_refresh_token(refresh_token)
if not userinfo:
# 如果验证失败,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 在模拟用户数据中查找该用户
find_user = next((user for user in MOCK_USERS if user['username'] == userinfo['username']), None)
if not find_user:
# 如果用户不存在,返回 403 错误
return make_response(jsonify({"error": "Forbidden"}), 403)
# 生成新的 access token
access_token = generate_access_token(find_user)
# 返回新的 access token
return jsonify({"access_token": access_token})
# 登出接口
@user_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出接口"""
# 这里简单返回成功响应,因为在 Python 中没有类似清除 cookie 的操作示例
return jsonify({"message": "Logged out successfully"})