selenium_elm_fengshen/run_window_v3.py

336 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import json
import logging
import os
import re
from datetime import datetime
from time import sleep
import socketio
import websocket
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from webdriver_manager.firefox import GeckoDriverManager
import ssl
os.environ['WDM_SSL_VERIFY'] = "false"
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini', encoding="utf-8")
# 从配置文件中读取参数
url_base = config.get('base', 'url_base')
url_login = r"https://mozi-login.alibaba-inc.com/?APP_NAME=LPD_TEAM_AEOLUS&BACK_URL=https%3A%2F%2Faeolus.ele.me"
url_home = config.get('base', 'url_home')
url_work = config.get('base', 'url_work')
account = config.get('base', 'account')
password = config.get('base', 'password')
interval = int(config.get('base', 'interval'))
driver_type = config.get('base', 'driver')
driver_path_edge = config.get('base', 'driver_path_edge')
driver_path_chrome = config.get('base', 'driver_path_chrome')
WEBSOCKET_URL = config.get('WebSocket', 'WEBSOCKET_URL')
key = config.get('web', 'key')
# 创建一个 Socket.IO 客户端实例
sio = socketio.Client()
# 定义连接事件处理器
@sio.event
def connect():
print('Connected to server')
# 定义断开连接事件处理器
@sio.event
def disconnect():
print('Disconnected from server')
# 定义接收服务器消息的事件处理器
@sio.on('message')
def on_message(data):
print('Received message:', data)
# 从配置文件中读取参数
host = config.get('WebSocket', 'host')
port = config.get('WebSocket', 'port')
# 连接到服务器
sio.connect("ws://"+host+":"+port)
# WebDriver初始化
def init_webdriver(driver_type):
options = Options()
manager = {
"firefox": GeckoDriverManager,
"edge": EdgeChromiumDriverManager,
"chrome": ChromeDriverManager
}[driver_type.lower()]
driver_path = None # 初始化为None以防安装失败
try:
driver_path = manager().install()
logging.info("找到驱动" + driver_path)
service = Service(executable_path=driver_path)
if driver_type.lower() == "firefox":
oprofile = webdriver.FirefoxOptions()
oprofile.accept_insecure_certs = True
driver = webdriver.Firefox(service=service, options=oprofile)
elif driver_type.lower() == "edge":
driver = webdriver.Edge(service=service, options=options)
else: # chrome
options.add_argument('--ignore-certificate-errors')
driver = webdriver.Chrome(service=service, options=options)
logging.info(f"成功 - 使用{driver_type}")
return driver
except Exception as e:
logging.error(f"无法创建WebDriver实例: {e}")
if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动
edge_options = Options() # 创建Edge的Options实例
service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
logging.info("使用自行安装的Edge")
return driver
else:
edge_options = Options() # 创建Edge的Options实例
service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
logging.info("使用自行安装的Chrome")
return driver
# 登录流程
def login(driver, url_login, account, password):
driver.get(url_login)
WebDriverWait(driver, 10).until(EC.url_to_be(url_login))
username_field = driver.find_element(By.NAME, 'domainAccount')
username_field.clear()
username_field.send_keys(account)
password_field = driver.find_element(By.NAME, 'password')
password_field.clear()
password_field.send_keys(password)
login_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
)
login_button.click()
WebDriverWait(driver, 10).until_not(EC.url_to_be(url_login))
logging.info("登录成功!")
# 将在线时长转换为分钟
def parse_online_time(online_time_str):
# 如果字符串中同时包含小时和分钟
if "小时" in online_time_str and "分钟" in online_time_str:
match = re.search(r'(\d+)小时(\d+)分钟', online_time_str)
if match:
hours, minutes = match.groups()
return int(hours) * 60 + int(minutes)
# 如果字符串中只包含小时
elif "小时" in online_time_str:
match = re.search(r'(\d+)小时', online_time_str)
if match:
hours = match.group(1)
return int(hours) * 60
# 如果字符串中只包含分钟
elif "分钟" in online_time_str:
match = re.search(r'(\d+)分钟', online_time_str)
if match:
minutes = match.group(1)
return int(minutes)
# 如果字符串中既不包含小时也不包含分钟则返回0
else:
return 0
# 主要逻辑
def main():
driver = init_webdriver(driver_type)
if driver is None:
logging.error("驱动初始化失败")
return "驱动异常"
else:
logging.info("驱动初始化完毕")
driver.get(url_base) # 开始进入网页
try:
while driver.window_handles:
if driver.current_url == url_login:
sleep(3)
# 填写手机号码
username_field = driver.find_element(By.NAME, 'domainAccount')
username_field.clear()
username_field.send_keys(account)
# 填写密码
password_field = driver.find_element(By.NAME, 'password')
password_field.clear()
password_field.send_keys(password)
# 使用WebDriverWait等待登录按钮变为可点击状态
login_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
)
login_button.click()
logging.info("登录完成")
# 等待一段时间,确保登录过程完成
sleep(3)
if driver.current_url == url_home:
logging.info("已经进入后台主页")
sleep(1) # 等待加载,跳转到工作目录
driver.get(url_work)
sleep(1) # 等待加载,跳转到工作目录
logging.info("已经跳转到工作页面;"+driver.current_url)
if driver.current_url == url_work:
sleep(3)
logging.info("开始提取数据")
# 任务1: 提取目标在线时长和目标完单量
logging.info("任务1提取目标在线时长和目标完单量")
target_info_element = driver.find_element(By.XPATH,
'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[1]/div/div[1]/div/div/div[2]/span')
target_info_text = target_info_element.text
logging.info(target_info_text)
# target_online_time = int(re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(1)) * 60 + int(
# re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(2))
#
match = re.search(r'(?P<hours>\d+)?小时?(?P<minutes>\d+)?分钟?', target_info_text)
if match:
target_online_time = int(match.group(1)) * 60 + int(match.group(2))
else:
logging.error("无法从文本中提取目标在线时长信息")
sleep(2)
try:
match = re.search(r'(\S*)', target_info_text.replace(" ",""))
if match:
extracted_text = match.group(1)
target_online_time =parse_online_time(extracted_text)
else:
target_online_time=0
except Exception as e:
logging.error(e)
continue
logging.info("目标时间min"+str(target_online_time))
target_order_completion = int(re.search(r'目标完单量:(\d+)', target_info_text).group(1))
logging.info("目标单量:"+str(target_order_completion))
# 任务2: 提取表格数据并确定目标列的索引
logging.info("任务2提取表格数据并确定目标列的索引")
headers =[]
for i in range(1, 13): # 假设有12列
logging.info("开始查找表头:"+str(i))
# 使用字符串格式化构建XPath表达式
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div'
# 使用XPath表达式查找元素
header_element = driver.find_element(By.XPATH, xpath_expression)
if header_element.text=="" or header_element.text==None:
logging.error("方法1 失败")
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
header_element = driver.find_element(By.XPATH, xpath_expression)
# if header_element.text=="" or header_element.text==None:
# logging.error("方法2 失败")
# xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]/div/div'
# header_element = driver.find_element(By.XPATH, xpath_expression)
if header_element.text=="" or header_element.text==None:
logging.error("方法3 失败")
xpath_expression = f'/html/body/div[1]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
header_element = driver.find_element(By.XPATH, xpath_expression)
if header_element.text == "" or header_element.text == None:
logging.error("方法4 失败")
logging.error("依旧没有找到内容"+str(i))
logging.error("开始手动补齐")
if i ==8:
headers.append("全天完单量")
elif i==9:
headers.append("时段内在线时长")
elif i == 10:
headers.append("时段内背单时长")
elif i == 11:
headers.append("时段内完单量")
elif i == 12:
headers.append("配送中单量")
else:
logging.error(str(i)+" : "+str(header_element.text))
headers.append(header_element.text.replace(" ",""))
header_to_index = {header: index for index, header in enumerate(headers)}
online_time_header = "全天在线时长"
order_completion_header = "全天完单量"
logging.info("获取表头如下:")
logging.info(headers)
# 任务3: 根据表格数据和目标值,筛选出不合格的人
logging.info("任务3根据表格数据和目标值筛选出不合格的人")
unqualified_persons = []
rows = driver.find_elements(By.XPATH, '//tbody/tr')
for row in rows:
cells = row.find_elements(By.TAG_NAME, 'td')
row_data = [cell.text for cell in cells]
name = row_data[header_to_index["姓名"]]
online_time = row_data[header_to_index[online_time_header]]
order_completion = row_data[header_to_index[order_completion_header]]
# 将在线时长转换为分钟
total_online_time = parse_online_time(online_time)
# 比较在线时长和完单量
if total_online_time < target_online_time or int(order_completion) < target_order_completion:
unqualified_persons.append(row_data)
# logging.info(
# f"目标在线时长: {target_online_time // 60}小时{target_online_time % 60}分钟, 目标完单量: {target_order_completion}")
# # logging.info("不合格人员名单:", unqualified_persons)
# 发送数据到WebSocket
data_to_send = {
"目标时间": target_online_time,
"目标工作量": target_order_completion,
"未及格成员": unqualified_persons
}
sio.emit("data", json.dumps(data_to_send))
logging.info("准备工作……休息中……请等待休息完毕……")
driver.get(url_home) # 返回主页 避免登录失效
sleep(interval)
except Exception as e:
# 捕获异常比如网络问题或其他Selenium错误
print(f"An exception occurred: {e}")
sio.emit("log", {"err":"网络错误"})
finally:
sio.emit("log", {"info":"关闭连接"})
sio.disconnect()
# 确保在退出前关闭浏览器
driver.quit()
if __name__ == '__main__':
# 获取当前日期
today = datetime.now().date()
# 计算今年的8月10日
this_years_cutoff = datetime(today.year, 8, 10).date()
# 检查今天是否超过了截止日期
if today > this_years_cutoff:
print("软件已到期,请联系供应商更新许可证。")
else:
# 计算距离到期的天数
days_until_expiration = (this_years_cutoff - today).days
# 日志信息
logging.info(f"软件距离到期还有{days_until_expiration}")
data_to_send = {"info": "开始连接"}
sio.emit('log', data_to_send)
main()