336 lines
15 KiB
Python
336 lines
15 KiB
Python
import configparser
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
from time import sleep
|
||
|
||
import socketio
|
||
import websocket
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.chrome.service import Service
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from webdriver_manager.microsoft import EdgeChromiumDriverManager
|
||
from webdriver_manager.firefox import GeckoDriverManager
|
||
import ssl
|
||
os.environ['WDM_SSL_VERIFY'] = "false"
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# 读取配置文件
|
||
config = configparser.ConfigParser()
|
||
config.read('config.ini', encoding="utf-8")
|
||
|
||
# 从配置文件中读取参数
|
||
url_base = config.get('base', 'url_base')
|
||
url_login = r"https://mozi-login.alibaba-inc.com/?APP_NAME=LPD_TEAM_AEOLUS&BACK_URL=https%3A%2F%2Faeolus.ele.me"
|
||
url_home = config.get('base', 'url_home')
|
||
url_work = config.get('base', 'url_work')
|
||
account = config.get('base', 'account')
|
||
password = config.get('base', 'password')
|
||
interval = int(config.get('base', 'interval'))
|
||
driver_type = config.get('base', 'driver')
|
||
driver_path_edge = config.get('base', 'driver_path_edge')
|
||
driver_path_chrome = config.get('base', 'driver_path_chrome')
|
||
WEBSOCKET_URL = config.get('WebSocket', 'WEBSOCKET_URL')
|
||
key = config.get('web', 'key')
|
||
# 创建一个 Socket.IO 客户端实例
|
||
sio = socketio.Client()
|
||
|
||
# 定义连接事件处理器
|
||
@sio.event
|
||
def connect():
|
||
print('Connected to server')
|
||
|
||
# 定义断开连接事件处理器
|
||
@sio.event
|
||
def disconnect():
|
||
print('Disconnected from server')
|
||
|
||
# 定义接收服务器消息的事件处理器
|
||
@sio.on('message')
|
||
def on_message(data):
|
||
print('Received message:', data)
|
||
# 从配置文件中读取参数
|
||
host = config.get('WebSocket', 'host')
|
||
port = config.get('WebSocket', 'port')
|
||
# 连接到服务器
|
||
sio.connect("ws://"+host+":"+port)
|
||
# WebDriver初始化
|
||
def init_webdriver(driver_type):
|
||
options = Options()
|
||
manager = {
|
||
"firefox": GeckoDriverManager,
|
||
"edge": EdgeChromiumDriverManager,
|
||
"chrome": ChromeDriverManager
|
||
}[driver_type.lower()]
|
||
|
||
driver_path = None # 初始化为None,以防安装失败
|
||
try:
|
||
driver_path = manager().install()
|
||
logging.info("找到驱动" + driver_path)
|
||
service = Service(executable_path=driver_path)
|
||
|
||
if driver_type.lower() == "firefox":
|
||
oprofile = webdriver.FirefoxOptions()
|
||
oprofile.accept_insecure_certs = True
|
||
driver = webdriver.Firefox(service=service, options=oprofile)
|
||
elif driver_type.lower() == "edge":
|
||
driver = webdriver.Edge(service=service, options=options)
|
||
else: # chrome
|
||
options.add_argument('--ignore-certificate-errors')
|
||
driver = webdriver.Chrome(service=service, options=options)
|
||
|
||
logging.info(f"成功 - 使用{driver_type}")
|
||
return driver
|
||
|
||
except Exception as e:
|
||
logging.error(f"无法创建WebDriver实例: {e}")
|
||
if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动
|
||
edge_options = Options() # 创建Edge的Options实例
|
||
service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径
|
||
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
|
||
logging.info("使用自行安装的Edge")
|
||
return driver
|
||
else:
|
||
edge_options = Options() # 创建Edge的Options实例
|
||
service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径
|
||
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
|
||
logging.info("使用自行安装的Chrome")
|
||
return driver
|
||
|
||
# 登录流程
|
||
def login(driver, url_login, account, password):
|
||
driver.get(url_login)
|
||
WebDriverWait(driver, 10).until(EC.url_to_be(url_login))
|
||
|
||
username_field = driver.find_element(By.NAME, 'domainAccount')
|
||
username_field.clear()
|
||
username_field.send_keys(account)
|
||
|
||
password_field = driver.find_element(By.NAME, 'password')
|
||
password_field.clear()
|
||
password_field.send_keys(password)
|
||
|
||
login_button = WebDriverWait(driver, 10).until(
|
||
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
|
||
)
|
||
login_button.click()
|
||
|
||
WebDriverWait(driver, 10).until_not(EC.url_to_be(url_login))
|
||
logging.info("登录成功!")
|
||
# 将在线时长转换为分钟
|
||
def parse_online_time(online_time_str):
|
||
# 如果字符串中同时包含小时和分钟
|
||
if "小时" in online_time_str and "分钟" in online_time_str:
|
||
match = re.search(r'(\d+)小时(\d+)分钟', online_time_str)
|
||
if match:
|
||
hours, minutes = match.groups()
|
||
return int(hours) * 60 + int(minutes)
|
||
|
||
# 如果字符串中只包含小时
|
||
elif "小时" in online_time_str:
|
||
match = re.search(r'(\d+)小时', online_time_str)
|
||
if match:
|
||
hours = match.group(1)
|
||
return int(hours) * 60
|
||
|
||
# 如果字符串中只包含分钟
|
||
elif "分钟" in online_time_str:
|
||
match = re.search(r'(\d+)分钟', online_time_str)
|
||
if match:
|
||
minutes = match.group(1)
|
||
return int(minutes)
|
||
|
||
# 如果字符串中既不包含小时也不包含分钟,则返回0
|
||
else:
|
||
return 0
|
||
# 主要逻辑
|
||
def main():
|
||
driver = init_webdriver(driver_type)
|
||
if driver is None:
|
||
logging.error("驱动初始化失败")
|
||
return "驱动异常"
|
||
else:
|
||
logging.info("驱动初始化完毕")
|
||
|
||
driver.get(url_base) # 开始进入网页
|
||
try:
|
||
while driver.window_handles:
|
||
if driver.current_url == url_login:
|
||
sleep(3)
|
||
# 填写手机号码
|
||
username_field = driver.find_element(By.NAME, 'domainAccount')
|
||
username_field.clear()
|
||
username_field.send_keys(account)
|
||
# 填写密码
|
||
password_field = driver.find_element(By.NAME, 'password')
|
||
password_field.clear()
|
||
password_field.send_keys(password)
|
||
|
||
# 使用WebDriverWait等待登录按钮变为可点击状态
|
||
login_button = WebDriverWait(driver, 10).until(
|
||
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
|
||
)
|
||
login_button.click()
|
||
logging.info("登录完成")
|
||
# 等待一段时间,确保登录过程完成
|
||
sleep(3)
|
||
if driver.current_url == url_home:
|
||
logging.info("已经进入后台主页")
|
||
sleep(1) # 等待加载,跳转到工作目录
|
||
driver.get(url_work)
|
||
sleep(1) # 等待加载,跳转到工作目录
|
||
logging.info("已经跳转到工作页面;"+driver.current_url)
|
||
if driver.current_url == url_work:
|
||
|
||
sleep(3)
|
||
logging.info("开始提取数据")
|
||
# 任务1: 提取目标在线时长和目标完单量
|
||
logging.info("任务1:提取目标在线时长和目标完单量")
|
||
target_info_element = driver.find_element(By.XPATH,
|
||
'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[1]/div/div[1]/div/div/div[2]/span')
|
||
target_info_text = target_info_element.text
|
||
logging.info(target_info_text)
|
||
# target_online_time = int(re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(1)) * 60 + int(
|
||
# re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(2))
|
||
#
|
||
match = re.search(r'(?P<hours>\d+)?小时?(?P<minutes>\d+)?分钟?', target_info_text)
|
||
if match:
|
||
target_online_time = int(match.group(1)) * 60 + int(match.group(2))
|
||
else:
|
||
logging.error("无法从文本中提取目标在线时长信息")
|
||
sleep(2)
|
||
try:
|
||
match = re.search(r':(\S*),', target_info_text.replace(" ",""))
|
||
if match:
|
||
extracted_text = match.group(1)
|
||
target_online_time =parse_online_time(extracted_text)
|
||
else:
|
||
target_online_time=0
|
||
except Exception as e:
|
||
logging.error(e)
|
||
continue
|
||
|
||
|
||
logging.info("目标时间(min):"+str(target_online_time))
|
||
target_order_completion = int(re.search(r'目标完单量:(\d+)', target_info_text).group(1))
|
||
logging.info("目标单量:"+str(target_order_completion))
|
||
# 任务2: 提取表格数据并确定目标列的索引
|
||
logging.info("任务2:提取表格数据并确定目标列的索引")
|
||
headers =[]
|
||
for i in range(1, 13): # 假设有12列
|
||
logging.info("开始查找表头:"+str(i))
|
||
# 使用字符串格式化构建XPath表达式
|
||
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div'
|
||
# 使用XPath表达式查找元素
|
||
header_element = driver.find_element(By.XPATH, xpath_expression)
|
||
|
||
if header_element.text=="" or header_element.text==None:
|
||
logging.error("方法1 失败")
|
||
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
|
||
header_element = driver.find_element(By.XPATH, xpath_expression)
|
||
# if header_element.text=="" or header_element.text==None:
|
||
# logging.error("方法2 失败")
|
||
# xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]/div/div'
|
||
# header_element = driver.find_element(By.XPATH, xpath_expression)
|
||
if header_element.text=="" or header_element.text==None:
|
||
logging.error("方法3 失败")
|
||
xpath_expression = f'/html/body/div[1]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
|
||
header_element = driver.find_element(By.XPATH, xpath_expression)
|
||
if header_element.text == "" or header_element.text == None:
|
||
logging.error("方法4 失败")
|
||
logging.error("依旧没有找到内容"+str(i))
|
||
logging.error("开始手动补齐")
|
||
if i ==8:
|
||
headers.append("全天完单量")
|
||
elif i==9:
|
||
headers.append("时段内在线时长")
|
||
elif i == 10:
|
||
headers.append("时段内背单时长")
|
||
elif i == 11:
|
||
headers.append("时段内完单量")
|
||
elif i == 12:
|
||
headers.append("配送中单量")
|
||
else:
|
||
logging.error(str(i)+" : "+str(header_element.text))
|
||
headers.append(header_element.text.replace(" ",""))
|
||
header_to_index = {header: index for index, header in enumerate(headers)}
|
||
online_time_header = "全天在线时长"
|
||
order_completion_header = "全天完单量"
|
||
logging.info("获取表头如下:")
|
||
logging.info(headers)
|
||
|
||
# 任务3: 根据表格数据和目标值,筛选出不合格的人
|
||
logging.info("任务3:根据表格数据和目标值,筛选出不合格的人")
|
||
unqualified_persons = []
|
||
rows = driver.find_elements(By.XPATH, '//tbody/tr')
|
||
for row in rows:
|
||
cells = row.find_elements(By.TAG_NAME, 'td')
|
||
row_data = [cell.text for cell in cells]
|
||
name = row_data[header_to_index["姓名"]]
|
||
online_time = row_data[header_to_index[online_time_header]]
|
||
order_completion = row_data[header_to_index[order_completion_header]]
|
||
|
||
# 将在线时长转换为分钟
|
||
total_online_time = parse_online_time(online_time)
|
||
|
||
# 比较在线时长和完单量
|
||
if total_online_time < target_online_time or int(order_completion) < target_order_completion:
|
||
|
||
unqualified_persons.append(row_data)
|
||
|
||
# logging.info(
|
||
# f"目标在线时长: {target_online_time // 60}小时{target_online_time % 60}分钟, 目标完单量: {target_order_completion}")
|
||
# # logging.info("不合格人员名单:", unqualified_persons)
|
||
# 发送数据到WebSocket
|
||
data_to_send = {
|
||
"目标时间": target_online_time,
|
||
"目标工作量": target_order_completion,
|
||
"未及格成员": unqualified_persons
|
||
}
|
||
|
||
sio.emit(key, json.dumps(data_to_send))
|
||
logging.info("准备工作……休息中……请等待休息完毕……")
|
||
driver.get(url_home) # 返回主页 避免登录失效
|
||
|
||
sleep(interval)
|
||
|
||
except Exception as e:
|
||
# 捕获异常,比如网络问题或其他Selenium错误
|
||
print(f"An exception occurred: {e}")
|
||
sio.emit("log_"+key, {"err":"网络错误"})
|
||
|
||
finally:
|
||
sio.emit("log_"+key, {"info":"关闭连接"})
|
||
sio.disconnect()
|
||
# 确保在退出前关闭浏览器
|
||
driver.quit()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 获取当前日期
|
||
today = datetime.now().date()
|
||
|
||
# 计算今年的8月10日
|
||
this_years_cutoff = datetime(today.year, 8, 10).date()
|
||
|
||
# 检查今天是否超过了截止日期
|
||
if today > this_years_cutoff:
|
||
print("软件已到期,请联系供应商更新许可证。")
|
||
else:
|
||
# 计算距离到期的天数
|
||
days_until_expiration = (this_years_cutoff - today).days
|
||
|
||
# 日志信息
|
||
logging.info(f"软件距离到期还有{days_until_expiration}天")
|
||
|
||
data_to_send = {"info": "开始连接"}
|
||
sio.emit('log_'+key, data_to_send)
|
||
main() |