import configparser import logging import os import re from time import sleep from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.microsoft import EdgeChromiumDriverManager from webdriver_manager.firefox import GeckoDriverManager import ssl os.environ['WDM_SSL_VERIFY'] = "false" # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 读取配置文件 config = configparser.ConfigParser() config.read('config.ini', encoding="utf-8") # 从配置文件中读取参数 url_base = config.get('base', 'url_base') url_login = r"https://mozi-login.alibaba-inc.com/?APP_NAME=LPD_TEAM_AEOLUS&BACK_URL=https%3A%2F%2Faeolus.ele.me" url_home = config.get('base', 'url_home') url_work = config.get('base', 'url_work') account = config.get('base', 'account') password = config.get('base', 'password') interval = int(config.get('base', 'interval')) driver_type = config.get('base', 'driver') driver_path_edge = config.get('base', 'driver_path_edge') driver_path_chrome = config.get('base', 'driver_path_chrome') # WebDriver初始化 def init_webdriver(driver_type): options = Options() manager = { "firefox": GeckoDriverManager, "edge": EdgeChromiumDriverManager, "chrome": ChromeDriverManager }[driver_type.lower()] driver_path = None # 初始化为None,以防安装失败 try: driver_path = manager().install() logging.info("找到驱动" + driver_path) service = Service(executable_path=driver_path) if driver_type.lower() == "firefox": oprofile = webdriver.FirefoxOptions() oprofile.accept_insecure_certs = True driver = webdriver.Firefox(service=service, options=oprofile) elif driver_type.lower() == "edge": driver = webdriver.Edge(service=service, options=options) else: # chrome options.add_argument('--ignore-certificate-errors') driver = webdriver.Chrome(service=service, options=options) logging.info(f"成功 - 使用{driver_type}") return driver except Exception as e: logging.error(f"无法创建WebDriver实例: {e}") if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动 edge_options = Options() # 创建Edge的Options实例 service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径 driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例 logging.info("使用自行安装的Edge") return driver else: edge_options = Options() # 创建Edge的Options实例 service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径 driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例 logging.info("使用自行安装的Chrome") return driver # 登录流程 def login(driver, url_login, account, password): driver.get(url_login) WebDriverWait(driver, 10).until(EC.url_to_be(url_login)) username_field = driver.find_element(By.NAME, 'domainAccount') username_field.clear() username_field.send_keys(account) password_field = driver.find_element(By.NAME, 'password') password_field.clear() password_field.send_keys(password) login_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit')) ) login_button.click() WebDriverWait(driver, 10).until_not(EC.url_to_be(url_login)) logging.info("登录成功!") # 将在线时长转换为分钟 def parse_online_time(online_time_str): match = re.match(r'(\d*)小时(\d*)分钟', online_time_str) if match: hours, minutes = match.groups() hours = int(hours) if hours else 0 minutes = int(minutes) if minutes else 0 return hours * 60 + minutes else: # 如果字符串不匹配任何时间格式,假设它是以分钟表示的 return int(online_time_str) if online_time_str.isdigit() else 0 # 主要逻辑 def main(): driver = init_webdriver(driver_type) if driver is None: logging.error("驱动初始化失败") return "驱动异常" else: logging.info("驱动初始化完毕") driver.get(url_base) # 开始进入网页 while True: if driver.current_url == url_login: sleep(3) # 填写手机号码 username_field = driver.find_element(By.NAME, 'domainAccount') username_field.clear() username_field.send_keys(account) # 填写密码 password_field = driver.find_element(By.NAME, 'password') password_field.clear() password_field.send_keys(password) # 使用WebDriverWait等待登录按钮变为可点击状态 login_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit')) ) login_button.click() logging.info("登录完成") # 等待一段时间,确保登录过程完成 sleep(3) if driver.current_url == url_home: logging.info("已经进入后台主页") sleep(1) # 等待加载,跳转到工作目录 driver.get(url_work) sleep(1) # 等待加载,跳转到工作目录 logging.info("已经跳转到工作页面;"+driver.current_url) if driver.current_url == url_work: sleep(3) logging.info("开始提取数据") # 任务1: 提取目标在线时长和目标完单量 logging.info("任务1:提取目标在线时长和目标完单量") target_info_element = driver.find_element(By.XPATH, '//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[1]/div/div[1]/div/div/div[2]/span') target_info_text = target_info_element.text logging.info(target_info_text) target_online_time = int(re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(1)) * 60 + int( re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(2)) logging.info("目标时间(min):"+str(target_online_time)) target_order_completion = int(re.search(r'目标完单量:(\d+)', target_info_text).group(1)) logging.info("目标单量:"+str(target_order_completion)) # 任务2: 提取表格数据并确定目标列的索引 logging.info("任务2:提取表格数据并确定目标列的索引") headers =[] for i in range(1, 13): # 假设有12列 logging.info("开始查找表头:"+str(i)) # 使用字符串格式化构建XPath表达式 xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div' # 使用XPath表达式查找元素 header_element = driver.find_element(By.XPATH, xpath_expression) if header_element.text=="" or header_element.text==None: logging.error("方法1 失败") xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]' header_element = driver.find_element(By.XPATH, xpath_expression) # if header_element.text=="" or header_element.text==None: # logging.error("方法2 失败") # xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]/div/div' # header_element = driver.find_element(By.XPATH, xpath_expression) if header_element.text=="" or header_element.text==None: logging.error("方法3 失败") xpath_expression = f'/html/body/div[1]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]' header_element = driver.find_element(By.XPATH, xpath_expression) if header_element.text == "" or header_element.text == None: logging.error("方法4 失败") logging.error("依旧没有找到内容"+str(i)) logging.error("开始手动补齐") if i ==8: headers.append("全天完单量") elif i==9: headers.append("时段内在线时长") elif i == 10: headers.append("时段内背单时长") elif i == 11: headers.append("时段内完单量") elif i == 12: headers.append("配送中单量") else: logging.error(str(i)+" : "+str(header_element.text)) headers.append(header_element.text.replace(" ","")) header_to_index = {header: index for index, header in enumerate(headers)} online_time_header = "全天在线时长" order_completion_header = "全天完单量" logging.info("获取表头如下:") logging.info(headers) # 任务3: 根据表格数据和目标值,筛选出不合格的人 logging.info("任务3:根据表格数据和目标值,筛选出不合格的人") unqualified_persons = [] rows = driver.find_elements(By.XPATH, '//tbody/tr') for row in rows: cells = row.find_elements(By.TAG_NAME, 'td') row_data = [cell.text for cell in cells] name = row_data[header_to_index["姓名"]] online_time = row_data[header_to_index[online_time_header]] order_completion = row_data[header_to_index[order_completion_header]] # 将在线时长转换为分钟 total_online_time = parse_online_time(online_time) # 比较在线时长和完单量 if total_online_time < target_online_time or int(order_completion) < target_order_completion: unqualified_persons.append(name) logging.info( f"目标在线时长: {target_online_time // 60}小时{target_online_time % 60}分钟, 目标完单量: {target_order_completion}") logging.info("不合格人员名单:", unqualified_persons) logging.info("准备工作……休息中……请等待休息完毕……") sleep(interval) if __name__ == '__main__': main()