DrissionPage 请求一次换一个代理(不重启chrome)
实现原理:通过插件实现
# !/usr/bin/python3
# -*- coding:utf-8 -*-
"""
@author: JHC000abc@gmail.com
@file: switch_ip.py
@time: 2025/4/23 22:05
@desc:"""R"""
1. chrome s商店下载Proxy SwitchyOmega 3 (ZeroOmega)插件 并且到 C:\Users\v_jiaohaicheng\AppData\Local\Google\Chrome\User Data\Default\Extensions\pfnededegaaopdmhkdmcofjmoldfiped\3.3.23_0 中复制全部内容
替换 https://github.com/zero-peak/ZeroOmega/releases 里的.xpi 文件解压后的文件 以保证插件能够正常被加载
2. 插件配置页 chrome-extension://pfnededegaaopdmhkdmcofjmoldfiped/options.html#!/profile/proxy
3."""
import time
import atexit
from DrissionPage import WebPage, ChromiumOptions, SessionOptionsDEFAULT_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
EXTENSIONS_ZEROOMEGA = r"D:\Project\Python\baidu\crowdtest\collection-script\CR\tests\dp\Extensions\zeroomega-3.3.23"
CHROME_PATH = r"C:\Program Files\Google\Chrome\Application\chrome.exe"class DrissionPageDemo(object):""""""def __init__(self):""""""atexit.register(self.exit_handler)self.page = Nonedef exit_handler(self):""":return:"""if self.page:self.page.quit(del_data=True)def get_page(self, ua=None, cookies=None, incognito=True, time_out=60):""":param ua::param cookies::param incognito::param time_out::param headless::return:"""if not self.page:co = ChromiumOptions()so = SessionOptions()if cookies:so.set_cookies(cookies)if not ua:ua = DEFAULT_UAco.set_user_agent(user_agent=ua)co.set_argument('--no-sandbox')co.set_argument("--disable-gpu")co.ignore_certificate_errors(incognito)co.add_extension(EXTENSIONS_ZEROOMEGA)# co.set_browser_path(CHROME_PATH)co.mute(True)co.auto_port()co.set_timeouts(page_load=time_out)self.page = WebPage(chromium_options=co, session_or_options=so)return self.pagedef get_js(self, ip, port):"""生成修改插件ip port的js:param ip::param port::return:"""js_code = """var tdElements = document.querySelectorAll('td[ng-if="proxyEditors[scheme].scheme"].ng-scope');if (tdElements.length > 1) {var firstTdElement = tdElements[0];var firstInputElement = firstTdElement.querySelector('input[type="text"]'); if (firstInputElement) {firstInputElement.value = '""" + ip + """';var event = new Event('input', {'bubbles': true,'cancelable': true});firstInputElement.dispatchEvent(event);console.log('IP 地址已输入:', firstInputElement.value);} else {console.log('第一个 <td> 元素未找到输入框');return false; // 返回 False}var secondTdElement = tdElements[1];var secondInputElement = secondTdElement.querySelector('input[type="number"]'); if (secondInputElement) {secondInputElement.value = '""" + port + """';var event = new Event('input', {'bubbles': true,'cancelable': true});secondInputElement.dispatchEvent(event);console.log('端口号已输入:', secondInputElement.value);var applyButton = document.querySelector('a[ng-click="applyOptions()"]');if (applyButton) {applyButton.click();console.log('“应用选项”按钮已点击');return true; // 返回 True} else {console.log('未找到“应用选项”按钮');return false; // 返回 False}} else {console.log('第二个 <td> 元素未找到输入框');return false; }} else {console.log('未找到足够的 td 元素');return false; }"""return js_codedef load_extensions_default_config(self):"""启动浏览器后先加载插件默认配置:return:"""url_config = "chrome-extension://pfnededegaaopdmhkdmcofjmoldfiped/options.html#!/io"config_url = "https://bj.bcebos.com/petite-mark/public_read/vipshop/ZeroOmegaOptions.bak"js = """var inputElement = document.querySelector('input[ng-model="restoreOnlineUrl"]');if (inputElement){inputElement.value = '""" + config_url + """'; var event = new Event('input');inputElement.dispatchEvent(event);}else{return false;}var buttonElement = document.querySelector('button[ng-click="restoreOnline()"]');if (buttonElement){buttonElement.click();return true;}else{return false;}"""self.page.get(url_config, retry=3, interval=3)self.page.wait.eles_loaded("#restoreOnline")status = self.page.run_js(js)self.page.wait.eles_loaded("#ladda-progress")self.page.wait(1)return statusdef switch_ip(self, ip, port):"""切换ip:return:"""url = "chrome-extension://pfnededegaaopdmhkdmcofjmoldfiped/options.html#!/profile/proxy"self.page.get(url, retry=3, interval=3)self.page.wait.eles_loaded("#restoreOnline")status = self.page.run_js(self.get_js(ip, port))self.page.wait(1)return statusdef get_ip_port(self):""":return:"""lis = [["121.232.124.122", "40003"], ["821.232.124.122", "80003"], ["921.232.124.122", "90003"]]for i in lis:yield idef check_ip(self):""":return:"""js = """return document.getElementById("tab0_ip").innerText;"""res = self.page.run_js(js)print(res)while res == "加载中..." or res == "":res = self.page.run_js(js)print(res)time.sleep(1)print(f"IP:{res}")return resdef process(self, url):""":param url::return:"""self.get_page()# 首次启动浏览器加载插件默认配置 保证插件以proxy模式运行if not self.load_extensions_default_config():print("加载插件默认配置失败")returnself.page.get(url)init_ip = self.check_ip()print(f"now ip is :{init_ip}")for ip, port in self.get_ip_port():if self.switch_ip(ip, port):try:self.page.get(url)change_ip = self.check_ip()if change_ip != init_ip:print(f"ip 切换为:{change_ip}")init_ip = change_ipelse:print("ip 切换失败/ip不可用")except:print("chrome connect failed")self.get_page()if __name__ == '__main__':page = DrissionPageDemo()url = "https://ip.cn/"page.process(url)
备注:ip port换成可用的就基本不会出现 chrome connect failed 错误