-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSelenium_Util.py
More file actions
354 lines (318 loc) · 14.9 KB
/
Selenium_Util.py
File metadata and controls
354 lines (318 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import os
# 封装底层os,写文件路径
import shutil
# 多线程
import subprocess
from datetime import datetime
from pprint import pprint
import re
from time import sleep
import json
from selenium import webdriver
from selenium.common import StaleElementReferenceException, SessionNotCreatedException, TimeoutException, \
ElementNotInteractableException,ElementClickInterceptedException
from selenium.webdriver import ActionChains
from selenium.webdriver.edge.options import Options
from selenium.webdriver.edge.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from selenium.webdriver.support.wait import WebDriverWait
from functools import wraps
class Selenium_Edge:
def __init__(self, update=False, cookies=False, cookies_url = None, proxy_url=None, action=None):
self.update = update
self.cookies = cookies
self.cookies_url = cookies_url
self.proxy_url = proxy_url
self.action = action
self.driver = self.create_driver()
def create_driver(self):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.57'}
s = Service('./drive/msedgedriver.exe')
def init_webdriver():
download_path = os.getcwd()
print(f"下载目录:\n{download_path}")
options = Options()
options.add_experimental_option("detach", True)
# 设置下载路径
if self.proxy_url:
# 使用隧道IP和端口配置代理
options.add_argument(f'--proxy-server={self.proxy_url}')
prefs = {"download.default_directory": download_path}
options.add_experimental_option("prefs", prefs)
if self.update:
Selenium_Edge.update_driver()
driver = webdriver.Edge(service=s, options=options)
if self.cookies:
driver.get(self.cookies_url)
self.set_cookies(driver)
return driver
driver = init_webdriver()
action = ActionChains(driver)
self.action = action
return driver
# 更新驱动
@staticmethod
def update_driver():
def get_edge_version():
try:
command = r'reg query "HKEY_CURRENT_USER\Software\Microsoft\Edge\BLBeacon" /v version'
output = subprocess.check_output(command, shell=True, text=True, stderr=subprocess.STDOUT)
version_match = re.search(r"version\s+REG_SZ\s+([\d.]+)", output)
if version_match:
return version_match.group(1)
else:
return "Edge 版本号未找到"
except subprocess.CalledProcessError as e:
return f"命令执行错误: {e.output}"
def get_driver_version(driver_path):
# 检查驱动路径是否存在
if not os.path.exists(driver_path):
print("驱动不存在。")
return "0.0.0.0"
print("正在测试驱动版本")
s = Service(driver_path)
try:
driver = webdriver.Edge(service=s)
version = driver.capabilities['browserVersion']
driver.quit()
except SessionNotCreatedException as e:
# 使用正则表达式从异常消息中提取版本信息
match = re.search(r"Microsoft Edge version (\d+)", str(e))
if match:
unsupported_version = match.group(1)
print(f"驱动不支持的版本是:{unsupported_version}")
current_version_match = re.search(r"Current browser version is (\d+\.\d+\.\d+\.\d+)", str(e))
if current_version_match:
current_version = current_version_match.group(1)
print(f"驱动不支持的版本是: {current_version}")
return unsupported_version
return version
browser_version = get_edge_version()
driver_version = get_driver_version("./drive/msedgedriver.exe")
print(f"您的浏览器版本{driver_version}\n驱动版本{browser_version}")
if (browser_version != driver_version):
print(".........正在更新驱动.........")
def delete_directory(target_directory):
# 检查目标目录是否存在
if os.path.exists(target_directory):
try:
# 使用rmtree删除目录及其所有内容
shutil.rmtree(target_directory)
print(f"{target_directory} 已成功删除。")
except Exception as e:
print(f"删除 {target_directory} 时出错: {e}")
else:
print(f"{target_directory} 不存在。")
driver_path = EdgeChromiumDriverManager().install()
print(f"正在下载驱动,默认位置:\n{driver_path}")
# 获取驱动程序所在的文件夹路径
driver_directory = os.path.dirname(driver_path)
# 获取当前工作目录
current_directory = os.getcwd()
# 构建目标文件夹路径
target_directory = os.path.join(current_directory, "drive")
# 将驱动程序文件夹移动到目标文件夹
if os.path.exists(target_directory):
shutil.rmtree(target_directory)
shutil.move(driver_directory, target_directory)
print(f"正在从\n{driver_path}\n移动驱动至\n{target_directory}")
# 更新驱动程序路径为目标文件夹下的路径
driver_path = os.path.join(target_directory, os.path.basename(driver_path))
# 从driver_directory变量中截取.wdm的路径
wdm_directory = os.path.join(*driver_directory.split(os.sep)[:3], ".wdm")
# 删除驱动目录与缓存目录
print(f"正在删除驱动目录:\n{wdm_directory}")
delete_directory(wdm_directory)
selenium_cache_directory = os.path.join(*driver_directory.split(os.sep)[:3], ".cache", "selenium")
print(f"正在删除缓存目录:\n{selenium_cache_directory}")
delete_directory(selenium_cache_directory)
# 多线程废弃
# 初始化webdriver
# driver = webdriver.Edge(executable_path=driver_path)
# return driver
driver_state = False
print(".........更新驱动完毕.........")
else:
print("驱动已更新")
def get_driver(self, url, wait_time=10, wait_element_xpath=None):
try:
self.driver.get(url)
except TimeoutException:
print(f"页面在 {wait_time} 秒内未完全加载或未找到元素:{wait_element_xpath}")
self.driver.get(url)
return
# 如果提供了等待元素的XPath,则等待该元素加载
if wait_element_xpath:
try:
WebDriverWait(self.driver, wait_time).until(
EC.presence_of_element_located((By.XPATH, wait_element_xpath))
)
except TimeoutException:
print(f"页面在 {wait_time} 秒内未完全加载或未找到元素:{wait_element_xpath}")
@staticmethod
def locate_element(func):
@wraps(func)
def wrapper(self, xpath, xpath_kind=By.XPATH, father_element=None, default_value=False,timeout=3,is_watch=False,*args, **kwargs):
if father_element is None:
father_element = self.driver
element = None
try:
if is_watch:
element = WebDriverWait(father_element, timeout).until(
EC.visibility_of_element_located((xpath_kind, xpath))
)
else:
element = WebDriverWait(father_element, timeout).until(
EC.presence_of_element_located((xpath_kind, xpath))
)
except (TimeoutException, AttributeError):
# 如果无法找到元素,不调用func)
return default_value
# 如果找到了元素,调用func
return func(self, element, *args, **kwargs)
return wrapper
@locate_element
def get_element(self, element, default_value="get_element_error"):
return element if element is not None else default_value
@locate_element
def get_element_xpath(self, element, tag_name=False, innerHTML=False, default_value="get_xpath_error"):
# 获取该元素下的所有子元素
child_elements = element.find_elements(By.XPATH, "./*")
# 打印每个子元素的标签名和属性
for child in child_elements:
if tag_name:
print(child.tag_name)
if innerHTML:
print(child.get_attribute('innerHTML')) # 打印内部HTML,查看结构
return child_elements
def get_xpath(self, elements, tag_name=False, innerHTML=False, default_value="get_xpath_error"):
# 获取该元素下的所有子元素
child_elements = elements.find_elements(By.XPATH, "./*")
# 打印每个子元素的标签名和属性
for child in child_elements:
if tag_name:
print(child.tag_name)
if innerHTML:
print(child.get_attribute('innerHTML')) # 打印内部HTML,查看结构
return child_elements
@locate_element
def get_element_text(self, element, default_value="get_text_error"):
try:
text = element.text.strip() # 移除可能的前后空白字符
return text if text else "None" # 如果文本不为空,则返回文本,否则返回None
except Exception as e:
return default_value # 发生异常时返回默认值
def get_text(self, element):
return element.text.strip()
@locate_element
def input_element_text(self, element, text):
element.clear()
element.send_keys(text)
def input_text(self, element, text):
element.clear()
element.send_keys(text)
@locate_element
def get_element_attribute(self, element, attribute='href', default_value="attribute_error"):
try:
# 'textContent' 标签内的文字
# 'innerHTML' 标签的html
# 'outerHTML' 标签的完整 html
# ’href' 链接地址
return element.get_attribute(attribute)
except Exception as e:
return default_value
def get_attribute(self, element, attribute='href', default_value="attribute_error"):
try:
# 'textContent' 标签内的文字
# 'innerHTML' 标签的html
# 'outerHTML' 标签的完整 html
# ’href' 链接地址
return element.get_attribute(attribute)
except Exception as e:
return default_value
@locate_element
def click_element(self, element):
try:
element.click()
except (ElementNotInteractableException, ElementClickInterceptedException):
print("点击失败!")
def click(self,element):
element.click()
@locate_element
def hover_element(self, element, default_value="hover_error"):
try:
# 使用ActionChains模拟鼠标悬停
hover = ActionChains(self.driver).move_to_element(element)
hover.perform()
except Exception:
return default_value
@staticmethod
def locate_elements(func):
@wraps(func)
def wrapper(self, xpath, xpath_kind=By.XPATH, father_element=None, default_value="Null", *args, **kwargs):
if father_element is None:
father_element = self.driver
try:
elements = WebDriverWait(father_element, 3).until(
EC.presence_of_all_elements_located((xpath_kind, xpath))
)
return func(self, elements, *args, **kwargs)
except (TimeoutException, AttributeError):
return default_value
return wrapper
# 批量解决同一类型元素,对特殊元素需自己进行调用单个元素方法处理
@locate_elements
def get_elements(self, elements, default_value="get_elements_error"):
return elements if elements else [default_value]
@locate_elements
def get_elements_text(self, elements, default_value="elements_text_error"):
text_list = []
for element in elements:
try:
text = element.text.strip() # 移除可能的前后空白字符
text_list.append(text if text else "None")
except Exception as e:
text_list.append(default_value) # 发生异常时添加默认值
return text_list
def get_cookies(self):
cookie_dict = {}
for cookie in self.driver.get_cookies():
cookie_dict[cookie['name']] = cookie['value']
with open('cookies.json', 'w') as file:
json.dump(cookie_dict, file)
print("Cookies已保存至cookies.json")
return cookie_dict
def set_cookies(self, driver):
# 从文件中读取cookies
with open('cookies.json', 'r') as file:
cookies = json.load(file)
# 遍历字典,为每个cookie调用add_cookie方法
for name, value in cookies.items():
cookie_dict = {
'name': name,
'value': value
}
driver.add_cookie(cookie_dict)
print("已从cookies.txt中加载cookies")
def destory(self):
self.driver.quit()
def switch_to_window_by_index(self, index):
"""
根据窗口句柄的索引(序号)切换窗口。
:param index: 窗口句柄的索引,从0开始。
"""
try:
# 获取当前会话的所有窗口句柄
window_handles = self.driver.window_handles
# 检查提供的索引是否在窗口句柄列表的范围内
if index < len(window_handles):
# 使用窗口句柄切换到指定的窗口
self.driver.switch_to.window(window_handles[index])
print(f"已切换到窗口: {self.driver.title}")
else:
print(f"索引 {index} 超出窗口句柄列表范围。")
except Exception as e:
print(f"切换窗口时出错: {e}")