selenium_elm_fengshen/run_window_v3.py

336 lines
15 KiB
Python
Raw Normal View History

2024-07-22 08:47:35 +00:00
import configparser
2024-07-22 13:45:14 +00:00
import json
2024-07-22 08:47:35 +00:00
import logging
import os
import re
2024-07-22 13:45:14 +00:00
from datetime import datetime
2024-07-22 08:47:35 +00:00
from time import sleep
2024-07-22 13:45:14 +00:00
import socketio
import websocket
2024-07-22 08:47:35 +00:00
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from webdriver_manager.firefox import GeckoDriverManager
import ssl
os.environ['WDM_SSL_VERIFY'] = "false"
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini', encoding="utf-8")
# 从配置文件中读取参数
url_base = config.get('base', 'url_base')
url_login = r"https://mozi-login.alibaba-inc.com/?APP_NAME=LPD_TEAM_AEOLUS&BACK_URL=https%3A%2F%2Faeolus.ele.me"
url_home = config.get('base', 'url_home')
url_work = config.get('base', 'url_work')
account = config.get('base', 'account')
password = config.get('base', 'password')
interval = int(config.get('base', 'interval'))
driver_type = config.get('base', 'driver')
driver_path_edge = config.get('base', 'driver_path_edge')
driver_path_chrome = config.get('base', 'driver_path_chrome')
2024-07-22 13:45:14 +00:00
WEBSOCKET_URL = config.get('WebSocket', 'WEBSOCKET_URL')
key = config.get('web', 'key')
# 创建一个 Socket.IO 客户端实例
sio = socketio.Client()
# 定义连接事件处理器
@sio.event
def connect():
print('Connected to server')
# 定义断开连接事件处理器
@sio.event
def disconnect():
print('Disconnected from server')
# 定义接收服务器消息的事件处理器
@sio.on('message')
def on_message(data):
print('Received message:', data)
# 从配置文件中读取参数
host = config.get('WebSocket', 'host')
port = config.get('WebSocket', 'port')
# 连接到服务器
sio.connect("ws://"+host+":"+port)
2024-07-22 08:47:35 +00:00
# WebDriver初始化
def init_webdriver(driver_type):
options = Options()
manager = {
"firefox": GeckoDriverManager,
"edge": EdgeChromiumDriverManager,
"chrome": ChromeDriverManager
}[driver_type.lower()]
driver_path = None # 初始化为None以防安装失败
try:
driver_path = manager().install()
logging.info("找到驱动" + driver_path)
service = Service(executable_path=driver_path)
if driver_type.lower() == "firefox":
oprofile = webdriver.FirefoxOptions()
oprofile.accept_insecure_certs = True
driver = webdriver.Firefox(service=service, options=oprofile)
elif driver_type.lower() == "edge":
driver = webdriver.Edge(service=service, options=options)
else: # chrome
options.add_argument('--ignore-certificate-errors')
driver = webdriver.Chrome(service=service, options=options)
logging.info(f"成功 - 使用{driver_type}")
return driver
except Exception as e:
logging.error(f"无法创建WebDriver实例: {e}")
if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动
edge_options = Options() # 创建Edge的Options实例
service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
logging.info("使用自行安装的Edge")
return driver
else:
edge_options = Options() # 创建Edge的Options实例
service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
logging.info("使用自行安装的Chrome")
return driver
# 登录流程
def login(driver, url_login, account, password):
driver.get(url_login)
WebDriverWait(driver, 10).until(EC.url_to_be(url_login))
username_field = driver.find_element(By.NAME, 'domainAccount')
username_field.clear()
username_field.send_keys(account)
password_field = driver.find_element(By.NAME, 'password')
password_field.clear()
password_field.send_keys(password)
login_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
)
login_button.click()
WebDriverWait(driver, 10).until_not(EC.url_to_be(url_login))
logging.info("登录成功!")
# 将在线时长转换为分钟
def parse_online_time(online_time_str):
2024-07-22 13:45:14 +00:00
# 如果字符串中同时包含小时和分钟
if "小时" in online_time_str and "分钟" in online_time_str:
match = re.search(r'(\d+)小时(\d+)分钟', online_time_str)
if match:
hours, minutes = match.groups()
return int(hours) * 60 + int(minutes)
# 如果字符串中只包含小时
elif "小时" in online_time_str:
match = re.search(r'(\d+)小时', online_time_str)
if match:
hours = match.group(1)
return int(hours) * 60
# 如果字符串中只包含分钟
elif "分钟" in online_time_str:
match = re.search(r'(\d+)分钟', online_time_str)
if match:
minutes = match.group(1)
return int(minutes)
# 如果字符串中既不包含小时也不包含分钟则返回0
2024-07-22 08:47:35 +00:00
else:
2024-07-22 13:45:14 +00:00
return 0
2024-07-22 08:47:35 +00:00
# 主要逻辑
def main():
driver = init_webdriver(driver_type)
if driver is None:
logging.error("驱动初始化失败")
return "驱动异常"
else:
logging.info("驱动初始化完毕")
driver.get(url_base) # 开始进入网页
2024-07-22 13:45:14 +00:00
try:
while driver.window_handles:
if driver.current_url == url_login:
sleep(3)
# 填写手机号码
username_field = driver.find_element(By.NAME, 'domainAccount')
username_field.clear()
username_field.send_keys(account)
# 填写密码
password_field = driver.find_element(By.NAME, 'password')
password_field.clear()
password_field.send_keys(password)
2024-07-22 08:47:35 +00:00
2024-07-22 13:45:14 +00:00
# 使用WebDriverWait等待登录按钮变为可点击状态
login_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
)
login_button.click()
logging.info("登录完成")
# 等待一段时间,确保登录过程完成
sleep(3)
if driver.current_url == url_home:
logging.info("已经进入后台主页")
sleep(1) # 等待加载,跳转到工作目录
driver.get(url_work)
sleep(1) # 等待加载,跳转到工作目录
logging.info("已经跳转到工作页面;"+driver.current_url)
if driver.current_url == url_work:
sleep(3)
logging.info("开始提取数据")
# 任务1: 提取目标在线时长和目标完单量
logging.info("任务1提取目标在线时长和目标完单量")
target_info_element = driver.find_element(By.XPATH,
'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[1]/div/div[1]/div/div/div[2]/span')
target_info_text = target_info_element.text
logging.info(target_info_text)
# target_online_time = int(re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(1)) * 60 + int(
# re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(2))
#
match = re.search(r'(?P<hours>\d+)?小时?(?P<minutes>\d+)?分钟?', target_info_text)
if match:
target_online_time = int(match.group(1)) * 60 + int(match.group(2))
2024-07-22 08:47:35 +00:00
else:
2024-07-22 13:45:14 +00:00
logging.error("无法从文本中提取目标在线时长信息")
sleep(2)
try:
match = re.search(r'(\S*)', target_info_text.replace(" ",""))
if match:
extracted_text = match.group(1)
target_online_time =parse_online_time(extracted_text)
else:
target_online_time=0
except Exception as e:
logging.error(e)
continue
2024-07-22 08:47:35 +00:00
2024-07-22 13:45:14 +00:00
logging.info("目标时间min"+str(target_online_time))
target_order_completion = int(re.search(r'目标完单量:(\d+)', target_info_text).group(1))
logging.info("目标单量:"+str(target_order_completion))
# 任务2: 提取表格数据并确定目标列的索引
logging.info("任务2提取表格数据并确定目标列的索引")
headers =[]
for i in range(1, 13): # 假设有12列
logging.info("开始查找表头:"+str(i))
# 使用字符串格式化构建XPath表达式
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div'
# 使用XPath表达式查找元素
header_element = driver.find_element(By.XPATH, xpath_expression)
2024-07-22 08:47:35 +00:00
2024-07-22 13:45:14 +00:00
if header_element.text=="" or header_element.text==None:
logging.error("方法1 失败")
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
header_element = driver.find_element(By.XPATH, xpath_expression)
# if header_element.text=="" or header_element.text==None:
# logging.error("方法2 失败")
# xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]/div/div'
# header_element = driver.find_element(By.XPATH, xpath_expression)
if header_element.text=="" or header_element.text==None:
logging.error("方法3 失败")
xpath_expression = f'/html/body/div[1]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
header_element = driver.find_element(By.XPATH, xpath_expression)
if header_element.text == "" or header_element.text == None:
logging.error("方法4 失败")
logging.error("依旧没有找到内容"+str(i))
logging.error("开始手动补齐")
if i ==8:
headers.append("全天完单量")
elif i==9:
headers.append("时段内在线时长")
elif i == 10:
headers.append("时段内背单时长")
elif i == 11:
headers.append("时段内完单量")
elif i == 12:
headers.append("配送中单量")
else:
logging.error(str(i)+" : "+str(header_element.text))
headers.append(header_element.text.replace(" ",""))
header_to_index = {header: index for index, header in enumerate(headers)}
online_time_header = "全天在线时长"
order_completion_header = "全天完单量"
logging.info("获取表头如下:")
logging.info(headers)
2024-07-22 08:47:35 +00:00
2024-07-22 13:45:14 +00:00
# 任务3: 根据表格数据和目标值,筛选出不合格的人
logging.info("任务3根据表格数据和目标值筛选出不合格的人")
unqualified_persons = []
rows = driver.find_elements(By.XPATH, '//tbody/tr')
for row in rows:
cells = row.find_elements(By.TAG_NAME, 'td')
row_data = [cell.text for cell in cells]
name = row_data[header_to_index["姓名"]]
online_time = row_data[header_to_index[online_time_header]]
order_completion = row_data[header_to_index[order_completion_header]]
2024-07-22 08:47:35 +00:00
2024-07-22 13:45:14 +00:00
# 将在线时长转换为分钟
total_online_time = parse_online_time(online_time)
2024-07-22 08:47:35 +00:00
2024-07-22 13:45:14 +00:00
# 比较在线时长和完单量
if total_online_time < target_online_time or int(order_completion) < target_order_completion:
unqualified_persons.append(row_data)
# logging.info(
# f"目标在线时长: {target_online_time // 60}小时{target_online_time % 60}分钟, 目标完单量: {target_order_completion}")
# # logging.info("不合格人员名单:", unqualified_persons)
# 发送数据到WebSocket
data_to_send = {
"目标时间": target_online_time,
"目标工作量": target_order_completion,
"未及格成员": unqualified_persons
}
sio.emit("data", json.dumps(data_to_send))
logging.info("准备工作……休息中……请等待休息完毕……")
driver.get(url_home) # 返回主页 避免登录失效
sleep(interval)
except Exception as e:
# 捕获异常比如网络问题或其他Selenium错误
print(f"An exception occurred: {e}")
sio.emit("log", {"err":"网络错误"})
finally:
sio.emit("log", {"info":"关闭连接"})
sio.disconnect()
# 确保在退出前关闭浏览器
driver.quit()
2024-07-22 08:47:35 +00:00
if __name__ == '__main__':
2024-07-22 13:45:14 +00:00
# 获取当前日期
today = datetime.now().date()
# 计算今年的8月10日
this_years_cutoff = datetime(today.year, 8, 10).date()
# 检查今天是否超过了截止日期
if today > this_years_cutoff:
print("软件已到期,请联系供应商更新许可证。")
else:
# 计算距离到期的天数
days_until_expiration = (this_years_cutoff - today).days
# 日志信息
logging.info(f"软件距离到期还有{days_until_expiration}")
data_to_send = {"info": "开始连接"}
sio.emit('log', data_to_send)
main()