From cc6ed5183ebad74e769dcd3a846dd82c51d101dc Mon Sep 17 00:00:00 2001 From: maitrisavaliya Date: Tue, 9 Dec 2025 11:22:07 +0530 Subject: [PATCH] Add 5 major improvements: rate limiting, CSV export, retries, checkpoints, and enhanced validation --- scanner.py | 402 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 355 insertions(+), 47 deletions(-) diff --git a/scanner.py b/scanner.py index 9d0bdb2..32e872d 100755 --- a/scanner.py +++ b/scanner.py @@ -11,6 +11,7 @@ CVE-2025-55182 & CVE-2025-66478 Based on research from Assetnote Security Research Team. +Enhanced with rate limiting, CSV export, retry logic, and checkpoint system. """ import argparse @@ -20,6 +21,9 @@ import random import re import string +import csv +import threading +import time from datetime import datetime, timezone from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse @@ -51,6 +55,54 @@ class Colors: RESET = "\033[0m" +class RateLimiter: + """Simple token bucket rate limiter for controlling scan speed.""" + + def __init__(self, requests_per_second: float = 10.0): + """ + Initialize rate limiter. + + Args: + requests_per_second: Maximum requests per second (default: 10) + """ + self.rate = requests_per_second + self.tokens = requests_per_second + self.max_tokens = requests_per_second + self.last_update = datetime.now(timezone.utc) + self.lock = threading.Lock() + + def acquire(self, tokens: int = 1) -> bool: + """ + Attempt to acquire tokens for a request. + Blocks if tokens unavailable. + + Args: + tokens: Number of tokens to acquire (default: 1) + + Returns: + True when tokens acquired + """ + with self.lock: + while True: + now = datetime.now(timezone.utc) + elapsed = (now - self.last_update).total_seconds() + + # Add tokens based on elapsed time + self.tokens = min( + self.max_tokens, + self.tokens + (elapsed * self.rate) + ) + self.last_update = now + + if self.tokens >= tokens: + self.tokens -= tokens + return True + + # Wait for tokens to refill + sleep_time = (tokens - self.tokens) / self.rate + time.sleep(sleep_time) + + def colorize(text: str, color: str) -> str: """Apply color to text.""" return f"{color}{text}{Colors.RESET}" @@ -239,31 +291,71 @@ def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: i return current_url -def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool) -> Tuple[Optional[requests.Response], Optional[str]]: - """Send the exploit payload to a URL. Returns (response, error).""" - try: - # Encode body as bytes to ensure proper Content-Length calculation - # and avoid potential encoding issues with the HTTP client - body_bytes = body.encode('utf-8') if isinstance(body, str) else body - response = requests.post( - target_url, - headers=headers, - data=body_bytes, - timeout=timeout, - verify=verify_ssl, - allow_redirects=False - ) - return response, None - except requests.exceptions.SSLError as e: - return None, f"SSL Error: {str(e)}" - except requests.exceptions.ConnectionError as e: - return None, f"Connection Error: {str(e)}" - except requests.exceptions.Timeout: - return None, "Request timed out" - except RequestException as e: - return None, f"Request failed: {str(e)}" - except Exception as e: - return None, f"Unexpected error: {str(e)}" +def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool, max_retries: int = 3) -> Tuple[Optional[requests.Response], Optional[str]]: + """ + Send the exploit payload to a URL with retry logic. + + Args: + target_url: URL to send payload to + headers: HTTP headers + body: Request body + timeout: Request timeout + verify_ssl: Whether to verify SSL certificates + max_retries: Maximum number of retry attempts (default: 3) + + Returns: + Tuple of (response, error_message) + """ + body_bytes = body.encode('utf-8') if isinstance(body, str) else body + + # Errors that should be retried + retryable_errors = ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + requests.exceptions.ChunkedEncodingError, + ) + + last_error = None + for attempt in range(max_retries): + try: + response = requests.post( + target_url, + headers=headers, + data=body_bytes, + timeout=timeout, + verify=verify_ssl, + allow_redirects=False + ) + return response, None + + except requests.exceptions.SSLError as e: + # SSL errors shouldn't be retried + return None, f"SSL Error: {str(e)}" + + except retryable_errors as e: + last_error = e + + # Don't retry on last attempt + if attempt < max_retries - 1: + # Exponential backoff: 1s, 2s, 4s + backoff_time = 2 ** attempt + time.sleep(backoff_time) + continue + else: + # Final attempt failed + if isinstance(e, requests.exceptions.Timeout): + return None, "Request timed out" + else: + return None, f"Connection Error: {str(e)}" + + except RequestException as e: + return None, f"Request failed: {str(e)}" + + except Exception as e: + return None, f"Unexpected error: {str(e)}" + + # Shouldn't reach here, but just in case + return None, f"Max retries exceeded: {str(last_error)}" def is_vulnerable_safe_check(response: requests.Response) -> bool: @@ -290,7 +382,7 @@ def is_vulnerable_rce_check(response: requests.Response) -> bool: return bool(re.search(r'.*/login\?a=11111.*', redirect_header)) -def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: Optional[dict[str, str]] = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False, paths: Optional[list[str]] = None) -> dict: +def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, follow_redirects: bool = True, custom_headers: Optional[dict[str, str]] = None, safe_check: bool = False, windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128, vercel_waf_bypass: bool = False, paths: Optional[list[str]] = None, max_retries: int = 3) -> dict: """ Check if a host is vulnerable to CVE-2025-55182/CVE-2025-66478. @@ -351,7 +443,7 @@ def check_vulnerability(host: str, timeout: int = 10, verify_ssl: bool = True, f def build_request_str(url: str) -> str: parsed = urlparse(url) - req_str = f"POST {'/aaa' or '/aaa'} HTTP/1.1\r\n" + req_str = f"POST {parsed.path or '/'} HTTP/1.1\r\n" req_str += f"Host: {parsed.netloc}\r\n" for k, v in headers.items(): req_str += f"{k}: {v}\r\n" @@ -379,7 +471,7 @@ def build_response_str(resp: requests.Response) -> str: result["final_url"] = test_url result["request"] = build_request_str(test_url) - response, error = send_payload(test_url, headers, body, timeout, verify_ssl) + response, error = send_payload(test_url, headers, body, timeout, verify_ssl, max_retries) if error: # In RCE mode, timeouts indicate not vulnerable (patched servers hang) @@ -409,7 +501,7 @@ def build_response_str(resp: requests.Response) -> str: redirect_url = resolve_redirects(test_url, timeout, verify_ssl) if redirect_url != test_url: # Different path, test it - response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl) + response, error = send_payload(redirect_url, headers, body, timeout, verify_ssl, max_retries) if error: # Continue to next path @@ -471,21 +563,149 @@ def load_paths(paths_file: str) -> list[str]: def save_results(results: list[dict], output_file: str, vulnerable_only: bool = True): + """Save scan results to JSON file with validation and detailed error handling.""" if vulnerable_only: results = [r for r in results if r.get("vulnerable") is True] output = { "scan_time": datetime.now(timezone.utc).isoformat() + "Z", + "scanner_version": "1.1.0", + "total_scanned": len([r for r in results if r.get("vulnerable") is not None]), + "total_vulnerable": len([r for r in results if r.get("vulnerable") is True]), + "total_errors": len([r for r in results if r.get("error") is not None]), "total_results": len(results), "results": results } + # Validate output directory exists + output_dir = os.path.dirname(output_file) + if output_dir and not os.path.exists(output_dir): + try: + os.makedirs(output_dir, exist_ok=True) + except OSError as e: + print(colorize(f"\n[ERROR] Cannot create output directory '{output_dir}': {e}", Colors.RED)) + return False + + # Validate JSON serialization before writing + try: + json_str = json.dumps(output, indent=2) + except (TypeError, ValueError) as e: + print(colorize(f"\n[ERROR] Cannot serialize results to JSON: {e}", Colors.RED)) + return False + + # Write to file with atomic write (temp file + rename) + temp_file = f"{output_file}.tmp" try: - with open(output_file, "w") as f: - json.dump(output, f, indent=2) + with open(temp_file, "w", encoding="utf-8") as f: + f.write(json_str) + # Atomic rename + os.replace(temp_file, output_file) print(colorize(f"\n[+] Results saved to: {output_file}", Colors.GREEN)) + print(colorize(f" Total scanned: {output['total_scanned']}", Colors.CYAN)) + print(colorize(f" Vulnerable: {output['total_vulnerable']}", Colors.RED if output['total_vulnerable'] > 0 else Colors.GREEN)) + print(colorize(f" Errors: {output['total_errors']}", Colors.YELLOW if output['total_errors'] > 0 else Colors.GREEN)) + return True + except PermissionError: + print(colorize(f"\n[ERROR] Permission denied writing to '{output_file}'", Colors.RED)) + if os.path.exists(temp_file): + os.remove(temp_file) + return False + except OSError as e: + print(colorize(f"\n[ERROR] Failed to write results: {e}", Colors.RED)) + if os.path.exists(temp_file): + os.remove(temp_file) + return False + + +def save_results_csv(results: list[dict], output_file: str, vulnerable_only: bool = True): + """ + Save scan results to CSV file for easy import into spreadsheets. + + Args: + results: List of scan result dictionaries + output_file: Path to output CSV file + vulnerable_only: Only save vulnerable hosts + """ + if vulnerable_only: + results = [r for r in results if r.get("vulnerable") is True] + + if not results: + print(colorize("\n[!] No results to save", Colors.YELLOW)) + return False + + # Define CSV columns + fieldnames = [ + "host", + "vulnerable", + "status_code", + "tested_url", + "final_url", + "redirected", + "error", + "timestamp" + ] + + # Prepare rows + csv_rows = [] + for r in results: + row = { + "host": r.get("host", ""), + "vulnerable": "YES" if r.get("vulnerable") else "NO" if r.get("vulnerable") is False else "ERROR", + "status_code": r.get("status_code", ""), + "tested_url": r.get("tested_url", ""), + "final_url": r.get("final_url", ""), + "redirected": "YES" if (r.get("final_url") and r.get("tested_url") and r.get("final_url") != r.get("tested_url")) else "NO", + "error": r.get("error", ""), + "timestamp": r.get("timestamp", "") + } + csv_rows.append(row) + + try: + with open(output_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(csv_rows) + + print(colorize(f"\n[+] CSV results saved to: {output_file}", Colors.GREEN)) + print(colorize(f" Total records: {len(csv_rows)}", Colors.CYAN)) + return True + except Exception as e: + print(colorize(f"\n[ERROR] Failed to save CSV: {e}", Colors.RED)) + return False + + +def save_checkpoint(results: list[dict], checkpoint_file: str): + """Save current scan progress to checkpoint file.""" + try: + with open(checkpoint_file, "w") as f: + json.dump({ + "timestamp": datetime.now(timezone.utc).isoformat() + "Z", + "completed_hosts": [r["host"] for r in results], + "results": results + }, f, indent=2) + except Exception as e: + print(colorize(f"\n[WARNING] Failed to save checkpoint: {e}", Colors.YELLOW)) + + +def load_checkpoint(checkpoint_file: str) -> Tuple[set, list[dict]]: + """ + Load checkpoint file to resume scan. + + Returns: + Tuple of (completed_hosts_set, previous_results) + """ + try: + with open(checkpoint_file, "r") as f: + data = json.load(f) + completed = set(data.get("completed_hosts", [])) + results = data.get("results", []) + print(colorize(f"\n[+] Loaded checkpoint: {len(completed)} hosts already scanned", Colors.GREEN)) + return completed, results + except FileNotFoundError: + return set(), [] except Exception as e: - print(colorize(f"\n[ERROR] Failed to save results: {e}", Colors.RED)) + print(colorize(f"\n[WARNING] Failed to load checkpoint: {e}", Colors.YELLOW)) + return set(), [] def print_result(result: dict, verbose: bool = False): @@ -523,7 +743,7 @@ def print_result(result: dict, verbose: bool = False): def main(): parser = argparse.ArgumentParser( - description="React2Shell Scanner", + description="React2Shell Scanner - Enhanced with rate limiting, CSV export, retry logic, and checkpoints", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: @@ -534,6 +754,8 @@ def main(): %(prog)s -u https://example.com --path /_next %(prog)s -u https://example.com --path /_next --path /api %(prog)s -u https://example.com --path-file paths.txt + %(prog)s -l hosts.txt --rate-limit 5.0 -o results.csv + %(prog)s -l large_list.txt --checkpoint scan.json --resume """ ) @@ -563,7 +785,7 @@ def main(): parser.add_argument( "-o", "--output", - help="Output file for results (JSON format)" + help="Output file for results. Format detected by extension (.json or .csv)" ) parser.add_argument( @@ -649,6 +871,31 @@ def main(): help="File containing list of paths to test (one per line, e.g., '/_next', '/api')" ) + parser.add_argument( + "--rate-limit", + type=float, + metavar="RPS", + help="Limit requests per second (e.g., 10.0 for 10 requests/sec)" + ) + + parser.add_argument( + "--retries", + type=int, + default=3, + help="Number of retries for failed requests (default: 3)" + ) + + parser.add_argument( + "--checkpoint", + help="Enable checkpoint file for resume capability (e.g., scan_checkpoint.json)" + ) + + parser.add_argument( + "--resume", + action="store_true", + help="Resume from checkpoint file if it exists" + ) + args = parser.parse_args() if args.no_color or not sys.stdout.isatty(): @@ -691,12 +938,28 @@ def main(): if args.waf_bypass and args.timeout == 10: timeout = 20 + # Load checkpoint if resuming + results = [] + completed_hosts = set() + if args.checkpoint and args.resume and os.path.exists(args.checkpoint): + completed_hosts, previous_results = load_checkpoint(args.checkpoint) + results.extend(previous_results) + + # Filter out already completed hosts + if completed_hosts: + original_count = len(hosts) + hosts = [h for h in hosts if normalize_host(h) not in completed_hosts] + if not args.quiet: + print(colorize(f"[*] Skipping {len(completed_hosts)} already scanned hosts", Colors.CYAN)) + print(colorize(f"[*] Remaining hosts to scan: {len(hosts)}", Colors.CYAN)) + if not args.quiet: print(colorize(f"[*] Loaded {len(hosts)} host(s) to scan", Colors.CYAN)) if paths: print(colorize(f"[*] Testing {len(paths)} path(s): {', '.join(paths)}", Colors.CYAN)) print(colorize(f"[*] Using {args.threads} thread(s)", Colors.CYAN)) print(colorize(f"[*] Timeout: {timeout}s", Colors.CYAN)) + print(colorize(f"[*] Retries: {args.retries}", Colors.CYAN)) if args.safe_check: print(colorize("[*] Using safe side-channel check", Colors.CYAN)) else: @@ -707,13 +970,16 @@ def main(): print(colorize(f"[*] WAF bypass enabled ({args.waf_bypass_size}KB junk data)", Colors.CYAN)) if args.vercel_waf_bypass: print(colorize("[*] Vercel WAF bypass mode enabled", Colors.CYAN)) + if args.rate_limit: + print(colorize(f"[*] Rate limiting: {args.rate_limit} requests/sec", Colors.CYAN)) + if args.checkpoint: + print(colorize(f"[*] Checkpoint file: {args.checkpoint}", Colors.CYAN)) if args.insecure: print(colorize("[!] SSL verification disabled", Colors.YELLOW)) print() - results = [] - vulnerable_count = 0 - error_count = 0 + vulnerable_count = len([r for r in results if r.get("vulnerable") is True]) + error_count = len([r for r in results if r.get("error") is not None]) verify_ssl = not args.insecure custom_headers = parse_headers(args.headers) @@ -722,19 +988,45 @@ def main(): import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + # Initialize rate limiter if needed + rate_limiter = None + if args.rate_limit: + rate_limiter = RateLimiter(requests_per_second=args.rate_limit) + if len(hosts) == 1: - result = check_vulnerability(hosts[0], timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths) + result = check_vulnerability( + hosts[0], timeout, verify_ssl, + custom_headers=custom_headers, + safe_check=args.safe_check, + windows=args.windows, + waf_bypass=args.waf_bypass, + waf_bypass_size_kb=args.waf_bypass_size, + vercel_waf_bypass=args.vercel_waf_bypass, + paths=paths, + max_retries=args.retries + ) results.append(result) if not args.quiet or result["vulnerable"]: print_result(result, args.verbose) if result["vulnerable"]: - vulnerable_count = 1 + vulnerable_count += 1 else: with ThreadPoolExecutor(max_workers=args.threads) as executor: - futures = { - executor.submit(check_vulnerability, host, timeout, verify_ssl, custom_headers=custom_headers, safe_check=args.safe_check, windows=args.windows, waf_bypass=args.waf_bypass, waf_bypass_size_kb=args.waf_bypass_size, vercel_waf_bypass=args.vercel_waf_bypass, paths=paths): host - for host in hosts - } + futures = {} + for host in hosts: + # Rate limit before submitting + if rate_limiter: + rate_limiter.acquire() + + future = executor.submit( + check_vulnerability, + host, timeout, verify_ssl, + True, custom_headers, + args.safe_check, args.windows, + args.waf_bypass, args.waf_bypass_size, + args.vercel_waf_bypass, paths, args.retries + ) + futures[future] = host with tqdm( total=len(hosts), @@ -747,6 +1039,10 @@ def main(): result = future.result() results.append(result) + # Save checkpoint every 10 hosts + if args.checkpoint and len(results) % 10 == 0: + save_checkpoint(results, args.checkpoint) + if result["vulnerable"]: vulnerable_count += 1 tqdm.write("") @@ -762,24 +1058,36 @@ def main(): pbar.update(1) + # Save final checkpoint + if args.checkpoint: + save_checkpoint(results, args.checkpoint) + if not args.quiet: print() print(colorize("=" * 60, Colors.CYAN)) print(colorize("SCAN SUMMARY", Colors.BOLD)) print(colorize("=" * 60, Colors.CYAN)) - print(f" Total hosts scanned: {len(hosts)}") + print(f" Total hosts scanned: {len(results)}") if vulnerable_count > 0: print(f" {colorize(f'Vulnerable: {vulnerable_count}', Colors.RED + Colors.BOLD)}") else: print(f" Vulnerable: {vulnerable_count}") - print(f" Not vulnerable: {len(hosts) - vulnerable_count - error_count}") + print(f" Not vulnerable: {len(results) - vulnerable_count - error_count}") print(f" Errors: {error_count}") print(colorize("=" * 60, Colors.CYAN)) if args.output: - save_results(results, args.output, vulnerable_only=not args.all_results) + output_lower = args.output.lower() + if output_lower.endswith('.csv'): + save_results_csv(results, args.output, vulnerable_only=not args.all_results) + elif output_lower.endswith('.json'): + save_results(results, args.output, vulnerable_only=not args.all_results) + else: + # Default to JSON if no extension + print(colorize("[!] No file extension detected, defaulting to JSON format", Colors.YELLOW)) + save_results(results, args.output, vulnerable_only=not args.all_results) if vulnerable_count > 0: sys.exit(1) @@ -787,4 +1095,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file