import configparser import json import logging import os import re from datetime import datetime from time import sleep import socketio import websocket from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.microsoft import EdgeChromiumDriverManager from webdriver_manager.firefox import GeckoDriverManager import ssl os.environ['WDM_SSL_VERIFY'] = "false" # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 读取配置文件 config = configparser.ConfigParser() config.read('config.ini', encoding="utf-8") # 从配置文件中读取参数 url_base = config.get('base', 'url_base') url_login = r"https://mozi-login.alibaba-inc.com/?APP_NAME=LPD_TEAM_AEOLUS&BACK_URL=https%3A%2F%2Faeolus.ele.me" url_home = config.get('base', 'url_home') url_work = config.get('base', 'url_work') account = config.get('base', 'account') password = config.get('base', 'password') interval = int(config.get('base', 'interval')) driver_type = config.get('base', 'driver') driver_path_edge = config.get('base', 'driver_path_edge') driver_path_chrome = config.get('base', 'driver_path_chrome') WEBSOCKET_URL = config.get('WebSocket', 'WEBSOCKET_URL') key = config.get('web', 'key') # 创建一个 Socket.IO 客户端实例 sio = socketio.Client() # 定义连接事件处理器 @sio.event def connect(): print('Connected to server') # 定义断开连接事件处理器 @sio.event def disconnect(): print('Disconnected from server') # 定义接收服务器消息的事件处理器 @sio.on('message') def on_message(data): print('Received message:', data) # 从配置文件中读取参数 host = config.get('WebSocket', 'host') port = config.get('WebSocket', 'port') # 连接到服务器 sio.connect("ws://"+host+":"+port) # WebDriver初始化 def init_webdriver(driver_type): # 先检查是否配置了自行安装的驱动 try: if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动 edge_options = Options() # 创建Edge的Options实例 service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径 driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例 logging.info("使用自行安装的Edge") return driver elif driver_path_chrome: edge_options = Options() # 创建Edge的Options实例 service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径 driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例 logging.info("使用自行安装的Chrome") return driver except Exception as e: logging.info("使用本地驱动失败,请先配置本地驱动。"+e.args[0]) options = Options() manager = { "firefox": GeckoDriverManager, "edge": EdgeChromiumDriverManager, "chrome": ChromeDriverManager }[driver_type.lower()] driver_path = None # 初始化为None,以防安装失败 try: driver_path = manager().install() logging.info("找到驱动" + driver_path) service = Service(executable_path=driver_path) if driver_type.lower() == "firefox": oprofile = webdriver.FirefoxOptions() oprofile.accept_insecure_certs = True driver = webdriver.Firefox(service=service, options=oprofile) elif driver_type.lower() == "edge": driver = webdriver.Edge(service=service, options=options) else: # chrome options.add_argument('--ignore-certificate-errors') driver = webdriver.Chrome(service=service, options=options) logging.info(f"成功 - 使用{driver_type}") return driver except Exception as e: logging.error(f"无法创建WebDriver实例: {e}") if driver_type.lower() == "edge" and driver_path_edge: # 只有在Edge类型且路径已知的情况下才尝试使用已存在的驱动 edge_options = Options() # 创建Edge的Options实例 service = Service(executable_path=driver_path_edge) # 使用已知的驱动路径 driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例 logging.info("使用自行安装的Edge") return driver else: edge_options = Options() # 创建Edge的Options实例 service = Service(executable_path=driver_path_chrome) # 使用已知的驱动路径 driver = webdriver.Edge(service=service, options=edge_options) # 传入Options实例 logging.info("使用自行安装的Chrome") return driver # 登录流程 def login(driver, url_login, account, password): driver.get(url_login) WebDriverWait(driver, 10).until(EC.url_to_be(url_login)) username_field = driver.find_element(By.NAME, 'domainAccount') username_field.clear() username_field.send_keys(account) password_field = driver.find_element(By.NAME, 'password') password_field.clear() password_field.send_keys(password) login_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit')) ) login_button.click() WebDriverWait(driver, 10).until_not(EC.url_to_be(url_login)) logging.info("登录成功!") # 将在线时长转换为分钟 def parse_online_time(online_time_str): # 如果字符串中同时包含小时和分钟 if "小时" in online_time_str and "分钟" in online_time_str: match = re.search(r'(\d+)小时(\d+)分钟', online_time_str) if match: hours, minutes = match.groups() return int(hours) * 60 + int(minutes) # 如果字符串中只包含小时 elif "小时" in online_time_str: match = re.search(r'(\d+)小时', online_time_str) if match: hours = match.group(1) return int(hours) * 60 # 如果字符串中只包含分钟 elif "分钟" in online_time_str: match = re.search(r'(\d+)分钟', online_time_str) if match: minutes = match.group(1) return int(minutes) # 如果字符串中既不包含小时也不包含分钟,则返回0 else: return 0 # 主要逻辑 def main(): driver = init_webdriver(driver_type) if driver is None: logging.error("驱动初始化失败") return "驱动异常" else: logging.info("驱动初始化完毕") driver.get(url_base) # 开始进入网页 try: while driver.window_handles: if driver.current_url == url_login: sleep(3) # 填写手机号码 username_field = driver.find_element(By.NAME, 'domainAccount') username_field.clear() username_field.send_keys(account) # 填写密码 password_field = driver.find_element(By.NAME, 'password') password_field.clear() password_field.send_keys(password) # 使用WebDriverWait等待登录按钮变为可点击状态 login_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, 'sso-btn-submit')) ) login_button.click() logging.info("登录完成") # 等待一段时间,确保登录过程完成 sleep(3) if driver.current_url == url_home: logging.info("已经进入后台主页") sleep(1) # 等待加载,跳转到工作目录 driver.get(url_work) sleep(1) # 等待加载,跳转到工作目录 logging.info("已经跳转到工作页面;"+driver.current_url) if driver.current_url == url_work: sleep(3) logging.info("开始提取数据") # 任务1: 提取目标在线时长和目标完单量 logging.info("任务1:提取目标在线时长和目标完单量") target_info_element = driver.find_element(By.XPATH, '//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[1]/div/div[1]/div/div/div[2]/span') target_info_text = target_info_element.text logging.info(target_info_text) # target_online_time = int(re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(1)) * 60 + int( # re.search(r'(\d+)小时(\d+)分钟', target_info_text).group(2)) # match = re.search(r'(?P\d+)?小时?(?P\d+)?分钟?', target_info_text) if match: target_online_time = int(match.group(1)) * 60 + int(match.group(2)) else: logging.error("无法从文本中提取目标在线时长信息") sleep(2) try: match = re.search(r':(\S*),', target_info_text.replace(" ","")) if match: extracted_text = match.group(1) target_online_time =parse_online_time(extracted_text) else: target_online_time=0 except Exception as e: logging.error(e) continue logging.info("目标时间(min):"+str(target_online_time)) target_order_completion = int(re.search(r'目标完单量:(\d+)', target_info_text).group(1)) logging.info("目标单量:"+str(target_order_completion)) # 任务2: 提取表格数据并确定目标列的索引 logging.info("任务2:提取表格数据并确定目标列的索引") headers =[] for i in range(1, 13): # 假设有12列 logging.info("开始查找表头:"+str(i)) # 使用字符串格式化构建XPath表达式 xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div' # 使用XPath表达式查找元素 header_element = driver.find_element(By.XPATH, xpath_expression) if header_element.text=="" or header_element.text==None: logging.error("方法1 失败") xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]' header_element = driver.find_element(By.XPATH, xpath_expression) # if header_element.text=="" or header_element.text==None: # logging.error("方法2 失败") # xpath_expression = f'//*[@id="root"]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]/div/div' # header_element = driver.find_element(By.XPATH, xpath_expression) if header_element.text=="" or header_element.text==None: logging.error("方法3 失败") xpath_expression = f'/html/body/div[1]/div/div/div/div/div[2]/div/div/div/div/div[2]/div[2]/div/div[2]/div/div/div/div/div[1]/table/thead/tr/th[{i}]/div/span[1]' header_element = driver.find_element(By.XPATH, xpath_expression) if header_element.text == "" or header_element.text == None: logging.error("方法4 失败") logging.error("依旧没有找到内容"+str(i)) logging.error("开始手动补齐") if i ==8: headers.append("全天完单量") elif i==9: headers.append("时段内在线时长") elif i == 10: headers.append("时段内背单时长") elif i == 11: headers.append("时段内完单量") elif i == 12: headers.append("配送中单量") else: logging.error(str(i)+" : "+str(header_element.text)) headers.append(header_element.text.replace(" ","")) header_to_index = {header: index for index, header in enumerate(headers)} online_time_header = "全天在线时长" order_completion_header = "全天完单量" logging.info("获取表头如下:") logging.info(headers) # 任务3: 根据表格数据和目标值,筛选出不合格的人 logging.info("任务3:根据表格数据和目标值,筛选出不合格的人") unqualified_persons = [] rows = driver.find_elements(By.XPATH, '//tbody/tr') for row in rows: cells = row.find_elements(By.TAG_NAME, 'td') row_data = [cell.text for cell in cells] name = row_data[header_to_index["姓名"]] online_time = row_data[header_to_index[online_time_header]] order_completion = row_data[header_to_index[order_completion_header]] # 将在线时长转换为分钟 total_online_time = parse_online_time(online_time) # 比较在线时长和完单量 if total_online_time < target_online_time or int(order_completion) < target_order_completion: unqualified_persons.append(row_data) # logging.info( # f"目标在线时长: {target_online_time // 60}小时{target_online_time % 60}分钟, 目标完单量: {target_order_completion}") # # logging.info("不合格人员名单:", unqualified_persons) # 发送数据到WebSocket data_to_send = { "目标时间": target_online_time, "目标工作量": target_order_completion, "未及格成员": unqualified_persons } sio.emit(key, json.dumps(data_to_send)) logging.info("准备工作……休息中……请等待休息完毕……") driver.get(url_home) # 返回主页 避免登录失效 sleep(interval) except Exception as e: # 捕获异常,比如网络问题或其他Selenium错误 print(f"An exception occurred: {e}") sio.emit("log_"+key, {"err":"网络错误"}) finally: sio.emit("log_"+key, {"info":"关闭连接"}) sio.disconnect() # 确保在退出前关闭浏览器 driver.quit() if __name__ == '__main__': # 获取当前日期 today = datetime.now().date() # 计算今年的8月10日 this_years_cutoff = datetime(today.year, 8, 10).date() # 检查今天是否超过了截止日期 if today > this_years_cutoff: print("软件已到期,请联系供应商更新许可证。") else: # 计算距离到期的天数 days_until_expiration = (this_years_cutoff - today).days # 日志信息 logging.info(f"软件距离到期还有{days_until_expiration}天") data_to_send = {"info": "开始连接"} sio.emit('log_'+key, data_to_send) main()