2024-07-22 07:21:36 +00:00
|
|
|
|
import configparser
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
2024-07-22 08:47:35 +00:00
|
|
|
|
import re
|
2024-07-22 07:21:36 +00:00
|
|
|
|
from time import sleep
|
|
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
|
from selenium import webdriver
|
|
|
|
|
from selenium.webdriver.chrome.options import Options
|
|
|
|
|
from selenium.webdriver.chrome.service import Service
|
|
|
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
|
from webdriver_manager.microsoft import EdgeChromiumDriverManager
|
|
|
|
|
from webdriver_manager.firefox import GeckoDriverManager
|
|
|
|
|
import ssl
|
|
|
|
|
os.environ['WDM_SSL_VERIFY'] = "false"
|
|
|
|
|
|
|
|
|
|
# 配置日志
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
|
|
|
|
# 读取配置文件
|
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
|
config.read('config.ini', encoding="utf-8")
|
|
|
|
|
|
|
|
|
|
# 从配置文件中读取参数
|
|
|
|
|
url_base = config.get('base', 'url_base')
|
|
|
|
|
url_login = r"https://mozi-login.alibaba-inc.com/?APP_NAME=LPD_TEAM_AEOLUS&BACK_URL=https%3A%2F%2Faeolus.ele.me"
|
|
|
|
|
url_home = config.get('base', 'url_home')
|
|
|
|
|
url_work = config.get('base', 'url_work')
|
|
|
|
|
account = config.get('base', 'account')
|
|
|
|
|
password = config.get('base', 'password')
|
|
|
|
|
interval = int(config.get('base', 'interval'))
|
|
|
|
|
driver_type = config.get('base', 'driver')
|
|
|
|
|
driver_path_edge = config.get('base', 'driver_path_edge')
|
|
|
|
|
driver_path_chrome = config.get('base', 'driver_path_chrome')
|
2024-07-22 13:45:14 +00:00
|
|
|
|
|
2024-07-22 07:21:36 +00:00
|
|
|
|
# WebDriver初始化
|
|
|
|
|
def init_webdriver(driver_type):
|
|
|
|
|
options = Options()
|
|
|
|
|
manager = {
|
|
|
|
|
"firefox": GeckoDriverManager,
|
|
|
|
|
"edge": EdgeChromiumDriverManager,
|
|
|
|
|
"chrome": ChromeDriverManager
|
|
|
|
|
}[driver_type.lower()]
|
|
|
|
|
|
|
|
|
|
driver_path = None # 初始化为None,以防安装失败
|
|
|
|
|
try:
|
|
|
|
|
driver_path = manager().install()
|
|
|
|
|
logging.info("找到驱动" + driver_path)
|
|
|
|
|
service = Service(executable_path=driver_path)
|
|
|
|
|
|
|
|
|
|
if driver_type.lower() == "firefox":
|
|
|
|
|
oprofile = webdriver.FirefoxOptions()
|
|
|
|
|
oprofile.accept_insecure_certs = True
|
|
|
|
|
driver = webdriver.Firefox(service=service, options=oprofile)
|
|
|
|
|
elif driver_type.lower() == "edge":
|
|
|
|
|
driver = webdriver.Edge(service=service, options=options)
|
|
|
|
|
else: # chrome
|
|
|
|
|
options.add_argument('--ignore-certificate-errors')
|
|
|
|
|
driver = webdriver.Chrome(service=service, options=options)
|
|
|
|
|
|
|
|
|
|
logging.info(f"成功 - 使用{driver_type}")
|
|
|
|
|
return driver
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"无法创建WebDriver实例: {e}")
|
|
|
|
|
if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动
|
|
|
|
|
edge_options = Options() # 创建Edge的Options实例
|
|
|
|
|
service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径
|
|
|
|
|
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
|
|
|
|
|
logging.info("使用自行安装的Edge")
|
|
|
|
|
return driver
|
|
|
|
|
else:
|
|
|
|
|
edge_options = Options() # 创建Edge的Options实例
|
|
|
|
|
service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径
|
|
|
|
|
driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例
|
|
|
|
|
logging.info("使用自行安装的Chrome")
|
|
|
|
|
return driver
|
|
|
|
|
|
|
|
|
|
# 登录流程
|
|
|
|
|
def login(driver, url_login, account, password):
|
|
|
|
|
driver.get(url_login)
|
|
|
|
|
WebDriverWait(driver, 10).until(EC.url_to_be(url_login))
|
|
|
|
|
|
|
|
|
|
username_field = driver.find_element(By.NAME, 'domainAccount')
|
|
|
|
|
username_field.clear()
|
|
|
|
|
username_field.send_keys(account)
|
|
|
|
|
|
|
|
|
|
password_field = driver.find_element(By.NAME, 'password')
|
|
|
|
|
password_field.clear()
|
|
|
|
|
password_field.send_keys(password)
|
|
|
|
|
|
|
|
|
|
login_button = WebDriverWait(driver, 10).until(
|
|
|
|
|
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
|
|
|
|
|
)
|
|
|
|
|
login_button.click()
|
|
|
|
|
|
|
|
|
|
WebDriverWait(driver, 10).until_not(EC.url_to_be(url_login))
|
|
|
|
|
logging.info("登录成功!")
|
2024-07-22 08:47:35 +00:00
|
|
|
|
# 将在线时长转换为分钟
|
|
|
|
|
def parse_online_time(online_time_str):
|
|
|
|
|
match = re.match(r'(\d*)小时(\d*)分钟', online_time_str)
|
|
|
|
|
if match:
|
|
|
|
|
hours, minutes = match.groups()
|
|
|
|
|
hours = int(hours) if hours else 0
|
|
|
|
|
minutes = int(minutes) if minutes else 0
|
|
|
|
|
return hours * 60 + minutes
|
|
|
|
|
else:
|
|
|
|
|
# 如果字符串不匹配任何时间格式,假设它是以分钟表示的
|
|
|
|
|
return int(online_time_str) if online_time_str.isdigit() else 0
|
2024-07-22 07:21:36 +00:00
|
|
|
|
# 主要逻辑
|
|
|
|
|
def main():
|
|
|
|
|
driver = init_webdriver(driver_type)
|
|
|
|
|
if driver is None:
|
2024-07-22 08:47:35 +00:00
|
|
|
|
logging.error("驱动初始化失败")
|
2024-07-22 07:21:36 +00:00
|
|
|
|
return "驱动异常"
|
2024-07-22 08:47:35 +00:00
|
|
|
|
else:
|
|
|
|
|
logging.info("驱动初始化完毕")
|
|
|
|
|
|
|
|
|
|
driver.get(url_base) # 开始进入网页
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
if driver.current_url == url_login:
|
|
|
|
|
sleep(3)
|
|
|
|
|
# 填写手机号码
|
|
|
|
|
username_field = driver.find_element(By.NAME, 'domainAccount')
|
|
|
|
|
username_field.clear()
|
|
|
|
|
username_field.send_keys(account)
|
|
|
|
|
# 填写密码
|
|
|
|
|
password_field = driver.find_element(By.NAME, 'password')
|
|
|
|
|
password_field.clear()
|
|
|
|
|
password_field.send_keys(password)
|
|
|
|
|
|
|
|
|
|
# 使用WebDriverWait等待登录按钮变为可点击状态
|
|
|
|
|
login_button = WebDriverWait(driver, 10).until(
|
|
|
|
|
EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit'))
|
|
|
|
|
)
|
|
|
|
|
login_button.click()
|
|
|
|
|
logging.info("登录完成")
|
|
|
|
|
# 等待一段时间,确保登录过程完成
|
|
|
|
|
sleep(3)
|
|
|
|
|
if driver.current_url == url_home:
|
|
|
|
|
logging.info("已经进入后台主页")
|
|
|
|
|
sleep(1) # 等待加载,跳转到工作目录
|
|
|
|
|
driver.get(url_work)
|
|
|
|
|
sleep(1) # 等待加载,跳转到工作目录
|
|
|
|
|
logging.info("已经跳转到工作页面;"+driver.current_url)
|
|
|
|
|
if driver.current_url == url_work:
|
|
|
|
|
|
|
|
|
|
sleep(3)
|
|
|
|
|
logging.info("开始提取数据")
|
|
|
|
|
# 任务1: 提取目标在线时长和目标完单量
|
|
|
|
|
logging.info("任务1:提取目标在线时长和目标完单量")
|
|
|
|
|
target_info_element = driver.find_element(By.XPATH,
|
|
|
|
|
'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[1]/div/div[1]/div/div/div[2]/span')
|
|
|
|
|
target_info_text = target_info_element.text
|
|
|
|
|
logging.info(target_info_text)
|
|
|
|
|
target_online_time = int(re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(1)) * 60 + int(
|
|
|
|
|
re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(2))
|
|
|
|
|
logging.info("目标时间(min):"+str(target_online_time))
|
|
|
|
|
target_order_completion = int(re.search(r'目标完单量:(\d+)', target_info_text).group(1))
|
|
|
|
|
logging.info("目标单量:"+str(target_order_completion))
|
|
|
|
|
# 任务2: 提取表格数据并确定目标列的索引
|
|
|
|
|
logging.info("任务2:提取表格数据并确定目标列的索引")
|
|
|
|
|
headers =[]
|
|
|
|
|
for i in range(1, 13): # 假设有12列
|
|
|
|
|
logging.info("开始查找表头:"+str(i))
|
|
|
|
|
# 使用字符串格式化构建XPath表达式
|
|
|
|
|
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div'
|
|
|
|
|
# 使用XPath表达式查找元素
|
|
|
|
|
header_element = driver.find_element(By.XPATH, xpath_expression)
|
|
|
|
|
|
|
|
|
|
if header_element.text=="" or header_element.text==None:
|
|
|
|
|
logging.error("方法1 失败")
|
|
|
|
|
xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
|
|
|
|
|
header_element = driver.find_element(By.XPATH, xpath_expression)
|
|
|
|
|
# if header_element.text=="" or header_element.text==None:
|
|
|
|
|
# logging.error("方法2 失败")
|
|
|
|
|
# xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]/div/div'
|
|
|
|
|
# header_element = driver.find_element(By.XPATH, xpath_expression)
|
|
|
|
|
if header_element.text=="" or header_element.text==None:
|
|
|
|
|
logging.error("方法3 失败")
|
|
|
|
|
xpath_expression = f'/html/body/div[1]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]'
|
|
|
|
|
header_element = driver.find_element(By.XPATH, xpath_expression)
|
|
|
|
|
if header_element.text == "" or header_element.text == None:
|
|
|
|
|
logging.error("方法4 失败")
|
|
|
|
|
logging.error("依旧没有找到内容"+str(i))
|
|
|
|
|
logging.error("开始手动补齐")
|
|
|
|
|
if i ==8:
|
|
|
|
|
headers.append("全天完单量")
|
|
|
|
|
elif i==9:
|
|
|
|
|
headers.append("时段内在线时长")
|
|
|
|
|
elif i == 10:
|
|
|
|
|
headers.append("时段内背单时长")
|
|
|
|
|
elif i == 11:
|
|
|
|
|
headers.append("时段内完单量")
|
|
|
|
|
elif i == 12:
|
|
|
|
|
headers.append("配送中单量")
|
|
|
|
|
else:
|
|
|
|
|
logging.error(str(i)+" : "+str(header_element.text))
|
|
|
|
|
headers.append(header_element.text.replace(" ",""))
|
|
|
|
|
header_to_index = {header: index for index, header in enumerate(headers)}
|
|
|
|
|
online_time_header = "全天在线时长"
|
|
|
|
|
order_completion_header = "全天完单量"
|
|
|
|
|
logging.info("获取表头如下:")
|
|
|
|
|
logging.info(headers)
|
|
|
|
|
|
|
|
|
|
# 任务3: 根据表格数据和目标值,筛选出不合格的人
|
|
|
|
|
logging.info("任务3:根据表格数据和目标值,筛选出不合格的人")
|
|
|
|
|
unqualified_persons = []
|
|
|
|
|
rows = driver.find_elements(By.XPATH, '//tbody/tr')
|
|
|
|
|
for row in rows:
|
|
|
|
|
cells = row.find_elements(By.TAG_NAME, 'td')
|
|
|
|
|
row_data = [cell.text for cell in cells]
|
|
|
|
|
name = row_data[header_to_index["姓名"]]
|
|
|
|
|
online_time = row_data[header_to_index[online_time_header]]
|
|
|
|
|
order_completion = row_data[header_to_index[order_completion_header]]
|
|
|
|
|
|
|
|
|
|
# 将在线时长转换为分钟
|
|
|
|
|
total_online_time = parse_online_time(online_time)
|
|
|
|
|
|
|
|
|
|
# 比较在线时长和完单量
|
|
|
|
|
if total_online_time < target_online_time or int(order_completion) < target_order_completion:
|
|
|
|
|
unqualified_persons.append(name)
|
|
|
|
|
|
|
|
|
|
logging.info(
|
|
|
|
|
f"目标在线时长: {target_online_time // 60}小时{target_online_time % 60}分钟, 目标完单量: {target_order_completion}")
|
|
|
|
|
logging.info("不合格人员名单:", unqualified_persons)
|
|
|
|
|
logging.info("准备工作……休息中……请等待休息完毕……")
|
|
|
|
|
sleep(interval)
|
2024-07-22 07:21:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|