GetQzonehistory/fetch_all_message.py
2024-09-15 10:55:35 +08:00

214 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import math
import os
import re
import sys
import time
import requests
from util import LoginUtil
WORKDIR = "./resource/fetch-all/"
MESSAGE_SAMPLE = 'msg-one.json'
MESSAGE_ALL = 'msg-all.json'
cookies = None
# 获取所有可见的未删除的说说+高清图片包含2014年之前
def get_visible_msg_list():
global cookies
if cookies is None:
cookies = LoginUtil.cookie()
# 1. 获取说说总条数
try:
msgSample = read_txt_file(MESSAGE_SAMPLE)
except FileNotFoundError as e:
# 样本缓存未找到,开始请求获取样本
qqResponse = get_msg_list(1)
# 创建缓存文件并写入
write_txt_file(MESSAGE_SAMPLE, qqResponse)
msgSample = read_txt_file(MESSAGE_SAMPLE)
try:
json_dict = json.loads(msgSample)
totalCount = json_dict['total']
print(f'你的未删除说说总条数{totalCount}')
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
sys.exit(1)
# 2. 获取所有说说数据
try:
msgAll = read_txt_file(MESSAGE_ALL)
except FileNotFoundError as e:
# 缓存未找到,准备分页获取所有未删除说说"
# 一页20条
defaultPageSize = 30
# 总页数
totalPageNum = math.ceil(totalCount / defaultPageSize)
# 用于存储所有页的数据
allPageData = []
print(f"一共{totalPageNum}")
for currentPageNum in range(0, totalPageNum):
# 数据偏移量
pos = currentPageNum * defaultPageSize
print(
f"一页{defaultPageSize}条, 获取第{currentPageNum + 1}")
qqResponse = get_msg_list(defaultPageSize, pos)
currentPageData = json.loads(qqResponse)["msglist"]
allPageData.extend(currentPageData)
msgAll = json.dumps({"msglist": allPageData}, ensure_ascii=False, indent=2)
write_txt_file(MESSAGE_ALL, msgAll)
try:
json_dict = json.loads(msgAll)
msgList = json_dict['msglist']
print(f'已获取到数据的说说总条数{len(msgList)}')
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
sys.exit(1)
# 3. 解析原始JSON写成Markdown
markdown_content = ''
for item in msgList:
myWord = item['content'] if item['content'] else ""
myCurrentQQName = item['name']
myCreateTime = format_timestamp(item['created_time'])
myCurrentSourceName = '\n来自 ' + item['source_name'] if item['source_name'] else ""
# 如果有图片
markdown_pictures = ""
if 'pic' in item:
for index, myPic in enumerate(item['pic']):
myPicUrl = myPic['url1']
myPicFileName = f"{item['tid']}{index}.jpeg"
get_image(myPicUrl, myPicFileName)
markdown_pictures += f"![{myPicFileName}](./{myPicFileName})"
markdown_content += f"## {myCurrentQQName} {myCreateTime} \n{myWord} {markdown_pictures} \n{myCurrentSourceName}"
# 有转发的内容
if 'rt_tid' in item:
rt_tid = item['rt_tid']
rtContent = item['rt_con']['content']
rtQQName = item['rt_uinname']
rt_uin = item['rt_uin']
markdown_content += f"\n> {rtQQName} - {rt_uin} : {rtContent}"
# 有人评论
if 'commentlist' in item:
markdown_content += f"\n💬 **{len(item['commentlist'])}条评论回复**\n"
for index, commentToMe in enumerate(item['commentlist']):
commentContent = commentToMe['content']
commentCreateTime = commentToMe['createTime2']
commentQQName = commentToMe['name']
commentQQNumber = commentToMe['uin']
markdown_content += f"- {commentQQName}({commentQQNumber}) : {commentContent} - {commentCreateTime}\n"
# append write
markdown_content += "\n\n"
# write markdown to file
write_txt_file("所有可见说说.md", markdown_content)
def get_msg_list(pageSize, offset=0):
url = 'https://user.qzone.qq.com/proxy/domain/taotao.qq.com/cgi-bin/emotion_cgi_msglist_v6'
g_tk = LoginUtil.bkn(cookies.get('p_skey'))
qqNumber = re.sub(r'o0*', '', cookies.get('uin'))
skey = cookies.get('skey')
p_uin = cookies.get('p_uin')
pt4_token = cookies.get('pt4_token')
p_skey = cookies.get('p_skey')
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'cookie': f'uin={p_uin};skey={skey};p_uin={p_uin};pt4_token={pt4_token};p_skey={p_skey}',
'priority': 'u=1, i',
'referer': f'https://user.qzone.qq.com/{qqNumber}/main',
'sec-ch-ua': '"Not;A=Brand";v="24", "Chromium";v="128"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
}
params = {
'uin': f'{qqNumber}',
'ftype': '0',
'sort': '0',
'pos': f'{offset}',
'num': f'{pageSize}',
'replynum': '100',
'g_tk': f'{g_tk}',
'callback': '_preloadCallback',
'code_version': '1',
'format': 'jsonp',
'need_private_comment': '1'
}
try:
response = requests.get(url, headers=headers, params=params)
except Exception as e:
print(e)
rawResponse = response.text
# 使用正则表达式去掉 _preloadCallback(),并提取其中的 JSON 数据
raw_txt = re.sub(r'^_preloadCallback\((.*)\);?$', r'\1', rawResponse, flags=re.S)
# 再转一次是为了去掉响应值本身自带的转义符http:\/\/
json_dict = json.loads(raw_txt)
if json_dict['code'] != 0:
print(f"错误 {json_dict['message']}")
sys.exit(1)
return json.dumps(json_dict, indent=2, ensure_ascii=False)
def write_txt_file(file_name, data):
if not os.path.exists(WORKDIR):
os.makedirs(WORKDIR)
base_path_file_name = os.path.join(WORKDIR, file_name)
with open(base_path_file_name, 'w', encoding='utf-8') as file:
file.write(data)
def read_txt_file(file_name):
base_path_file_name = os.path.join(WORKDIR, file_name)
if os.path.exists(base_path_file_name):
with open(base_path_file_name, 'r', encoding='utf-8') as file:
return file.read()
else:
raise FileNotFoundError(f"文件 {base_path_file_name} 不存在")
def format_timestamp(timestamp):
time_struct = time.localtime(timestamp)
formatted_time = time.strftime("%Y年%m月%d%H:%M:%S", time_struct)
return formatted_time
def get_image(url, img_name):
headers = {
'sec-ch-ua': '"Not;A=Brand";v="24", "Chromium";v="128"',
'Referer': 'https://user.qzone.qq.com/',
'sec-ch-ua-mobile': '?0',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'sec-ch-ua-platform': '"Linux"',
}
# 发起GET请求
response = requests.get(url, headers=headers)
# 检查请求是否成功
if response.status_code == 200:
# 保存图片到本地
file_path = os.path.join(WORKDIR, img_name)
with open(file_path, 'wb') as file:
file.write(response.content)
print('图片下载成功')
else:
print(f'请求失败,状态码:{response.status_code}')
if __name__ == '__main__':
get_visible_msg_list()