diff --git a/src/nssec/cli/main.py b/src/nssec/cli/main.py index 8fbe68a..46f5e25 100644 --- a/src/nssec/cli/main.py +++ b/src/nssec/cli/main.py @@ -191,9 +191,11 @@ def init(config_dir): # ─── REGISTER SUB-COMMAND GROUPS ─── from nssec.cli.audit import audit # noqa: E402 +from nssec.cli.mtls_commands import mtls # noqa: E402 from nssec.cli.waf_commands import waf # noqa: E402 cli.add_command(audit) +cli.add_command(mtls) cli.add_command(waf) diff --git a/src/nssec/cli/mtls_commands.py b/src/nssec/cli/mtls_commands.py new file mode 100644 index 0000000..4e5c5e3 --- /dev/null +++ b/src/nssec/cli/mtls_commands.py @@ -0,0 +1,175 @@ +"""mTLS management CLI commands for nssec.""" + +import click + +from nssec.cli import console + + +@click.group() +def mtls(): + """mTLS device provisioning management commands.""" + pass + + +@mtls.group("nodeping", invoke_without_command=True) +@click.pass_context +def mtls_nodeping(ctx): + """Manage NodePing monitoring IPs in mTLS config.""" + if ctx.invoked_subcommand is None: + ctx.invoke(nodeping_show) + + +@mtls_nodeping.command("show") +def nodeping_show(): + """Show current NodePing IPs in ndp_mtls.conf.""" + from nssec.modules.mtls import get_current_nodeping_ips + from nssec.modules.mtls.config import NDP_MTLS_CONF + from nssec.modules.mtls.utils import file_exists + + if not file_exists(NDP_MTLS_CONF): + console.print(f"[yellow]mTLS config not found:[/yellow] {NDP_MTLS_CONF}") + console.print("[dim]Is mTLSProtect installed?[/dim]") + return + + ips = get_current_nodeping_ips() + if not ips: + console.print("[dim]No NodePing IPs currently configured.[/dim]") + console.print("\nTo add NodePing IPs, run:") + console.print(" [cyan]sudo nssec mtls nodeping update[/cyan]") + return + + _display_ip_list(ips, "NodePing IPs") + + +@mtls_nodeping.command("fetch") +def nodeping_fetch(): + """Fetch and display NodePing IPs (dry run, no changes).""" + from nssec.modules.mtls.utils import fetch_nodeping_ips + + console.print("[bold]Fetching NodePing IPs...[/bold]") + ips, error = fetch_nodeping_ips() + + if error: + console.print(f"[red]Error:[/red] {error}") + raise SystemExit(1) + + console.print(f"\n[green]Fetched {len(ips)} IPs from NodePing[/green]\n") + _display_ip_list(ips, "Available IPs") + + console.print("\n[dim]This was a dry run. To apply, run:[/dim]") + console.print(" [cyan]sudo nssec mtls nodeping update[/cyan]") + + +def _require_root(command_name: str) -> None: + """Exit with error if not running as root.""" + from nssec.core.ssh import is_root + + if not is_root(): + console.print( + f"[red]Error: Must run as root (sudo nssec mtls nodeping {command_name})[/red]" + ) + raise SystemExit(1) + + +def _validate_and_reload(yes: bool) -> None: + """Validate Apache config and optionally reload.""" + from nssec.modules.mtls import reload_apache, rollback, validate_apache_config + from nssec.modules.mtls.config import NDP_MTLS_CONF + + val_result = validate_apache_config() + if not val_result.success: + console.print(f" [red]Error:[/red] {val_result.error}") + console.print("[yellow]Rolling back changes...[/yellow]") + rollback(NDP_MTLS_CONF) + raise SystemExit(1) + console.print(f" [green]Done:[/green] {val_result.message}") + + console.print() + if yes or click.confirm("Reload Apache to apply changes?"): + reload_result = reload_apache() + if reload_result.success: + console.print(f" [green]Done:[/green] {reload_result.message}") + else: + console.print(f" [red]Error:[/red] {reload_result.error}") + raise SystemExit(1) + else: + console.print("[yellow]Skipped Apache reload. Run manually:[/yellow]") + console.print(" [cyan]sudo systemctl reload apache2[/cyan]") + + +@mtls_nodeping.command("update") +@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompts") +@click.option("--dry-run", is_flag=True, help="Show what would be done without changes") +def nodeping_update(yes, dry_run): + """Fetch NodePing IPs and update ndp_mtls.conf.""" + from nssec.modules.mtls import update_nodeping_ips + + _require_root("update") + console.print("[bold]Updating NodePing IPs...[/bold]") + + result = update_nodeping_ips(dry_run=dry_run) + + if result.skipped: + console.print(f"[green]{result.message}[/green]") + return + + if not result.success: + console.print(f"[red]Error:[/red] {result.error}") + raise SystemExit(1) + + console.print(f" [green]Done:[/green] {result.message}") + + if dry_run: + console.print("\n[yellow]Dry run - no changes made.[/yellow]") + return + + _validate_and_reload(yes) + + +@mtls_nodeping.command("remove") +@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") +def nodeping_remove(yes): + """Remove NodePing IPs section from ndp_mtls.conf.""" + from nssec.modules.mtls import remove_nodeping_ips + + _require_root("remove") + + console.print( + "[bold yellow]Warning:[/bold yellow] This will remove all NodePing IPs from mTLS config." + ) + console.print() + + if not yes and not click.confirm("Remove NodePing IPs section?"): + console.print("[yellow]Aborted.[/yellow]") + return + + result = remove_nodeping_ips() + + if result.skipped: + console.print(f"[dim]{result.message}[/dim]") + return + + if not result.success: + console.print(f"[red]Error:[/red] {result.error}") + raise SystemExit(1) + + console.print(f" [green]Done:[/green] {result.message}") + _validate_and_reload(yes) + + +def _display_ip_list(ips: list[str], title: str) -> None: + """Display a list of IPs grouped by version.""" + ipv4 = [ip for ip in ips if ":" not in ip] + ipv6 = [ip for ip in ips if ":" in ip] + + console.print(f"[bold]{title}[/bold] ({len(ips)} total)\n") + + if ipv4: + console.print(f"[cyan]IPv4[/cyan] ({len(ipv4)}):") + for ip in sorted(ipv4): + console.print(f" {ip}") + + if ipv6: + console.print(f"\n[cyan]IPv6[/cyan] ({len(ipv6)}):") + for ip in sorted(ipv6): + console.print(f" {ip}") diff --git a/src/nssec/cli/waf_commands.py b/src/nssec/cli/waf_commands.py index fedcfd0..fd3d4e7 100644 --- a/src/nssec/cli/waf_commands.py +++ b/src/nssec/cli/waf_commands.py @@ -49,7 +49,8 @@ def _display_install_plan(pf, mode, skip_evasive): table.add_row("security2.conf", sec2_state, sec2_action) if not skip_evasive: - table.add_row("mod_evasive", "", "install if missing") + evasive_action = "install + enable" if mode == "On" else "install config (disabled in DetectionOnly)" + table.add_row("mod_evasive", "", evasive_action) console.print(table) @@ -133,6 +134,12 @@ def _build_status_table(status): else: table.add_row("OWASP CRS", "[red]not installed[/red]") + if status.evasive_installed: + evasive_state = "[green]enabled[/green]" if status.evasive_enabled else "[yellow]disabled[/yellow]" + table.add_row("mod_evasive", evasive_state) + else: + table.add_row("mod_evasive", "[dim]not installed[/dim]") + table.add_row("NS exclusions", _yn(status.exclusions_present, "yellow")) table.add_row("Audit log", _yn(status.audit_log_exists, "dim")) return table @@ -218,6 +225,9 @@ def waf_enable(yes): "[bold yellow]Warning:[/bold yellow] Switching to blocking mode " "will actively reject requests that match ModSecurity rules." ) + console.print( + "This will also [bold]enable mod_evasive[/bold] (HTTP flood protection)." + ) console.print( "Ensure you have reviewed " "[cyan]/var/log/apache2/modsec_audit.log[/cyan] for false positives." @@ -236,6 +246,89 @@ def waf_enable(yes): raise SystemExit(1) +@waf.command("disable") +@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") +def waf_disable(yes): + """Switch ModSecurity to DetectionOnly mode (logs but does not block).""" + from nssec.modules.waf import ModSecurityInstaller + + installer = ModSecurityInstaller() + pf = installer.preflight() + _require_root_and_modsec(pf, "sudo nssec waf disable") + + if pf.modsec_mode and pf.modsec_mode.lower() == "detectiononly": + console.print("[green]ModSecurity is already in DetectionOnly mode.[/green]") + return + + console.print( + "Switching to [cyan]DetectionOnly[/cyan] mode. " + "ModSecurity will log violations but not block requests." + ) + console.print( + "This will also [bold]disable mod_evasive[/bold] (HTTP flood protection)." + ) + console.print() + + if not yes and not click.confirm("Switch SecRuleEngine to DetectionOnly?"): + console.print("[yellow]Aborted.[/yellow]") + return + + result = installer.set_mode("DetectionOnly") + if result.success: + console.print(f"[green]{result.message}[/green]") + else: + console.print(f"[red]Error: {result.error}[/red]") + raise SystemExit(1) + + +@waf.command("remove") +@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") +def waf_remove(yes): + """Disable the ModSecurity Apache module entirely. + + This disables the security2 module in Apache, effectively turning off + the WAF completely. Use 'nssec waf init' to re-enable. + """ + from nssec.modules.waf import ModSecurityInstaller + from nssec.modules.waf.utils import run_cmd, file_exists + from nssec.modules.waf.config import SECURITY2_LOAD + from nssec.core.ssh import is_root + + if not is_root(): + console.print("[red]Error: Must run as root (sudo nssec waf remove)[/red]") + raise SystemExit(1) + + if not file_exists(SECURITY2_LOAD): + console.print("[green]ModSecurity module is already disabled.[/green]") + return + + console.print( + "[bold yellow]Warning:[/bold yellow] This will completely disable " + "the ModSecurity WAF module." + ) + console.print() + + if not yes and not click.confirm("Disable ModSecurity module?"): + console.print("[yellow]Aborted.[/yellow]") + return + + _, stderr, rc = run_cmd(["a2dismod", "security2"]) + if rc != 0: + console.print(f"[red]Error:[/red] Failed to disable module: {stderr}") + raise SystemExit(1) + console.print("[green]Done:[/green] Disabled security2 module") + + _, stderr, rc = run_cmd(["systemctl", "reload", "apache2"]) + if rc != 0: + console.print(f"[red]Error:[/red] Apache reload failed: {stderr}") + raise SystemExit(1) + console.print("[green]Done:[/green] Apache reloaded") + + console.print() + console.print("ModSecurity is now disabled. To re-enable, run:") + console.print(" [cyan]sudo nssec waf init[/cyan]") + + @waf.command("update-exclusions") @click.option("--yes", "-y", is_flag=True, help="Skip confirmation") @click.option( diff --git a/src/nssec/modules/mtls/__init__.py b/src/nssec/modules/mtls/__init__.py index 71fe843..0d12ff6 100644 --- a/src/nssec/modules/mtls/__init__.py +++ b/src/nssec/modules/mtls/__init__.py @@ -1 +1,157 @@ -"""mTLS device provisioning module.""" +"""mTLS device provisioning module. + +Provides management of NodePing monitoring IPs for the ndp_mtls.conf +Apache configuration file used by mTLSProtect. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from nssec.modules.mtls.config import BACKUP_SUFFIX, NDP_MTLS_CONF +from nssec.modules.mtls.utils import ( + backup_file, + build_managed_section, + fetch_nodeping_ips, + file_exists, + find_requireany_block, + get_managed_section, + read_file, + run_cmd, + write_file, +) + + +@dataclass +class StepResult: + """Result of a single operation step.""" + + success: bool = True + skipped: bool = False + message: str = "" + error: str = "" + + +def get_current_nodeping_ips() -> list[str]: + """Get currently configured NodePing IPs from ndp_mtls.conf.""" + content = read_file(NDP_MTLS_CONF) + if not content: + return [] + _, _, ips = get_managed_section(content) + return ips + + +def _build_dry_run_message(ips: list[str], existing_ips: list[str]) -> str: + """Build a dry-run summary message.""" + added = set(ips) - set(existing_ips) + removed = set(existing_ips) - set(ips) + msg = f"Would update {len(ips)} NodePing IPs" + if existing_ips: + if added: + msg += f" (+{len(added)} new)" + if removed: + msg += f" (-{len(removed)} removed)" + return msg + + +def _insert_nodeping_section(content: str, new_section: str) -> tuple[str, str]: + """Insert or replace the NodePing section in config content. + + Returns (new_content, error_message). Error is empty on success. + """ + sec_start, sec_end, _ = get_managed_section(content) + + if sec_start != -1: + return content[:sec_start] + new_section + content[sec_end:], "" + + insert_pos = find_requireany_block(content) + if insert_pos == -1: + return "", "Could not find block in ndp_mtls.conf" + return content[:insert_pos] + new_section + content[insert_pos:], "" + + +def update_nodeping_ips(dry_run: bool = False) -> StepResult: + """Fetch NodePing IPs and update ndp_mtls.conf.""" + if not file_exists(NDP_MTLS_CONF): + return StepResult( + success=False, + error=f"{NDP_MTLS_CONF} not found. Is mTLSProtect installed?", + ) + + ips, fetch_error = fetch_nodeping_ips() + if fetch_error: + return StepResult(success=False, error=fetch_error) + if not ips: + return StepResult(success=False, error="No valid IPs found in NodePing list") + + content = read_file(NDP_MTLS_CONF) + if not content: + return StepResult(success=False, error=f"Failed to read {NDP_MTLS_CONF}") + + _, _, existing_ips = get_managed_section(content) + if set(existing_ips) == set(ips): + return StepResult(skipped=True, message="NodePing IPs already up to date") + + if dry_run: + return StepResult(message=_build_dry_run_message(ips, existing_ips)) + + new_section = build_managed_section(ips) + new_content, error = _insert_nodeping_section(content, new_section) + if error: + return StepResult(success=False, error=error) + + backup_file(NDP_MTLS_CONF) + if not write_file(NDP_MTLS_CONF, new_content): + return StepResult(success=False, error=f"Failed to write {NDP_MTLS_CONF}") + + return StepResult(message=f"Updated {len(ips)} NodePing IPs in {NDP_MTLS_CONF}") + + +def remove_nodeping_ips() -> StepResult: + """Remove the nssec-managed NodePing section from ndp_mtls.conf.""" + if not file_exists(NDP_MTLS_CONF): + return StepResult(skipped=True, message=f"{NDP_MTLS_CONF} not found") + + content = read_file(NDP_MTLS_CONF) + if not content: + return StepResult(success=False, error=f"Failed to read {NDP_MTLS_CONF}") + + sec_start, sec_end, _ = get_managed_section(content) + if sec_start == -1: + return StepResult(skipped=True, message="No NodePing section found to remove") + + # Remove the section + new_content = content[:sec_start] + content[sec_end:] + + backup_file(NDP_MTLS_CONF) + if not write_file(NDP_MTLS_CONF, new_content): + return StepResult(success=False, error=f"Failed to write {NDP_MTLS_CONF}") + + return StepResult(message="Removed NodePing IPs section from ndp_mtls.conf") + + +def validate_apache_config() -> StepResult: + """Run apache2ctl configtest to validate configuration.""" + stdout, stderr, rc = run_cmd(["apache2ctl", "configtest"]) + if rc != 0: + return StepResult(success=False, error=f"Apache config test failed: {stderr or stdout}") + return StepResult(message="Apache config test passed") + + +def reload_apache() -> StepResult: + """Reload Apache to apply configuration changes.""" + _, stderr, rc = run_cmd(["systemctl", "reload", "apache2"]) + if rc != 0: + return StepResult(success=False, error=f"Apache reload failed: {stderr}") + return StepResult(message="Apache reloaded") + + +def rollback(path: str = NDP_MTLS_CONF) -> bool: + """Restore a file from its backup.""" + import shutil + + backup = path + BACKUP_SUFFIX + if file_exists(backup): + shutil.copy2(backup, path) + return True + return False diff --git a/src/nssec/modules/mtls/config.py b/src/nssec/modules/mtls/config.py new file mode 100644 index 0000000..ff285ef --- /dev/null +++ b/src/nssec/modules/mtls/config.py @@ -0,0 +1,14 @@ +"""mTLS module configuration constants.""" + +# NodePing IP list URL +NODEPING_URL = "https://nodeping.com/content/txt/pinghosts.txt" + +# Target configuration file +NDP_MTLS_CONF = "/etc/apache2/conf.d/ndp_mtls.conf" + +# Backup suffix (consistent with WAF module) +BACKUP_SUFFIX = ".bak.nssec" + +# Marker comments for managed section +NODEPING_BEGIN_MARKER = "# BEGIN nssec-managed NodePing IPs (do not edit)" +NODEPING_END_MARKER = "# END nssec-managed NodePing IPs" diff --git a/src/nssec/modules/mtls/utils.py b/src/nssec/modules/mtls/utils.py new file mode 100644 index 0000000..45a64c3 --- /dev/null +++ b/src/nssec/modules/mtls/utils.py @@ -0,0 +1,165 @@ +"""Utility functions for the mTLS module.""" + +from __future__ import annotations + +import shutil +from datetime import datetime, timezone +from pathlib import Path + +from nssec.core import ssh +from nssec.core.validators import validate_ip_address +from nssec.modules.mtls.config import ( + BACKUP_SUFFIX, + NODEPING_BEGIN_MARKER, + NODEPING_END_MARKER, + NODEPING_URL, +) + + +def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[str, str, int]: + """Run a command and return (stdout, stderr, returncode). + + Uses SSH-aware execution - works locally or remotely. + """ + return ssh.run_command(cmd, timeout) + + +def file_exists(path: str) -> bool: + """Check if a file exists. SSH-aware.""" + return ssh.file_exists(path) + + +def read_file(path: str) -> str | None: + """Read a file. SSH-aware.""" + return ssh.read_file(path) + + +def backup_file(path: str) -> str | None: + """Create a backup of a file. Returns backup path or None.""" + if not file_exists(path): + return None + backup = path + BACKUP_SUFFIX + if file_exists(backup): + return backup # Already backed up from a previous run + shutil.copy2(path, backup) + return backup + + +def write_file(path: str, content: str) -> bool: + """Write content to a file, creating parent dirs as needed.""" + try: + Path(path).parent.mkdir(parents=True, exist_ok=True) + Path(path).write_text(content) + return True + except OSError: + return False + + +def fetch_nodeping_ips() -> tuple[list[str], str]: + """Fetch NodePing IPs from their published list. + + Returns: + Tuple of (list of IPs, error message or empty string) + """ + stdout, stderr, rc = run_cmd(["curl", "-sL", "--max-time", "30", NODEPING_URL]) + if rc != 0: + return [], f"Failed to fetch NodePing IPs: {stderr}" + return parse_ip_list(stdout), "" + + +def parse_ip_list(content: str) -> list[str]: + """Parse plain text IP list. + + Handles formats: + - One IP per line: "192.168.1.1" + - Hostname and IP: "hostname.example.com 192.168.1.1" + """ + ips = [] + for line in content.strip().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + + # Handle "hostname IP" format (NodePing uses this) + parts = line.split() + candidates = parts if len(parts) > 1 else [line] + + for candidate in candidates: + try: + validate_ip_address(candidate) + ips.append(candidate) + break # Only take one IP per line + except ValueError: + continue + return ips + + +def get_managed_section(content: str) -> tuple[int, int, list[str]]: + """Find and parse the nssec-managed NodePing section. + + Returns: + Tuple of (start_pos, end_pos, list of IPs) or (-1, -1, []) if not found + """ + begin_idx = content.find(NODEPING_BEGIN_MARKER) + if begin_idx == -1: + return -1, -1, [] + + end_idx = content.find(NODEPING_END_MARKER, begin_idx) + if end_idx == -1: + return -1, -1, [] + + # Extract IPs from the section + section = content[begin_idx:end_idx] + ips = [] + for line in section.splitlines(): + stripped = line.strip() + if stripped.startswith("Require ip "): + ip = stripped.replace("Require ip ", "").strip() + ips.append(ip) + + return begin_idx, end_idx + len(NODEPING_END_MARKER), ips + + +def build_managed_section(ips: list[str]) -> str: + """Build the managed section content with marker comments.""" + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + lines = [ + "", + f" {NODEPING_BEGIN_MARKER}", + f" # Updated: {timestamp}", + f" # Source: {NODEPING_URL}", + f" # Count: {len(ips)} IPs", + ] + for ip in sorted(ips): + lines.append(f" Require ip {ip}") + lines.append(f" {NODEPING_END_MARKER}") + lines.append("") + return "\n".join(lines) + + +def find_requireany_block(content: str) -> int: + """Find the position of the first block to insert NodePing IPs. + + Returns the position after the opening tag, or -1 if not found. + """ + # Look for the outermost in the block + loc_start = content.find("", loc_start) + if loc_end == -1: + return -1 + + # Find first within the Location block + loc_content = content[loc_start:loc_end] + req_any_pos = loc_content.find("") + if req_any_pos == -1: + return -1 + + # Return absolute position after \n + abs_pos = loc_start + req_any_pos + len("") + # Skip past the newline if present + if abs_pos < len(content) and content[abs_pos] == "\n": + abs_pos += 1 + return abs_pos diff --git a/src/nssec/modules/waf/__init__.py b/src/nssec/modules/waf/__init__.py index b62e1b8..dad8a63 100644 --- a/src/nssec/modules/waf/__init__.py +++ b/src/nssec/modules/waf/__init__.py @@ -19,6 +19,10 @@ CRS_INSTALL_DIR, CRS_SEARCH_PATHS, CRS_SETUP_OVERRIDES_TEMPLATE, + EVASIVE_CONF, + EVASIVE_CONF_TEMPLATE, + EVASIVE_LOAD, + EVASIVE_LOG_DIR, EVASIVE_PACKAGE, MODSEC_AUDIT_LOG, MODSEC_CONF, @@ -56,7 +60,12 @@ def get_allowlisted_ips() -> list[str]: content = read_file(NS_EXCLUSIONS_CONF) if not content: return [] - return re.findall(r'id:10001\d+.*?@ipMatch\s+([^\s"]+)', content, re.DOTALL) + # Match SecRule with @ipMatch followed by id:10001xx (admin IP rules) + return re.findall( + r'SecRule REMOTE_ADDR "@ipMatch\s+([^\s"]+)"[^"]*"id:10001\d+', + content, + re.DOTALL, + ) def add_allowlisted_ip(ip: str) -> StepResult: @@ -191,7 +200,12 @@ def install_packages(self) -> StepResult: return StepResult(message=f"Installed: {', '.join(packages)}") def enable_modules(self) -> StepResult: - """Enable Apache security2 module.""" + """Enable Apache security2 module and conditionally enable evasive. + + mod_evasive is only enabled when the WAF mode is 'On' (blocking). + In DetectionOnly mode, evasive is left disabled to avoid blocking + traffic while the WAF is still being tuned. + """ if file_exists(SECURITY2_LOAD): return StepResult(skipped=True, message="security2 module already enabled") if self.dry_run: @@ -203,7 +217,8 @@ def enable_modules(self) -> StepResult: success=False, error=f"a2enmod security2 failed: {stderr}", ) - if self.install_evasive: + # Only enable evasive in blocking mode; DetectionOnly = passive + if self.install_evasive and self.mode == "On": run_cmd(["a2enmod", "evasive"]) return StepResult(message="Enabled security2 module") @@ -233,6 +248,62 @@ def setup_config(self) -> StepResult: msg = f"Configured ModSecurity (SecRuleEngine {self.mode})" return StepResult(message=msg) + def setup_evasive_config(self) -> StepResult: + """Write the mod_evasive configuration with tuned thresholds. + + Deploys a baseline evasive.conf with thresholds tuned for + NetSapiens traffic patterns and whitelists for localhost/internal IPs. + """ + if not self.install_evasive: + return StepResult(skipped=True, message="Evasive installation skipped") + if self.dry_run: + return StepResult(message=f"Would write {EVASIVE_CONF}") + + if file_exists(EVASIVE_CONF): + backup_file(EVASIVE_CONF) + + content = render(EVASIVE_CONF_TEMPLATE, log_dir=EVASIVE_LOG_DIR) + if not write_file(EVASIVE_CONF, content): + return StepResult(success=False, error=f"Failed to write {EVASIVE_CONF}") + + Path(EVASIVE_LOG_DIR).mkdir(parents=True, exist_ok=True) + return StepResult(message=f"Configured mod_evasive ({EVASIVE_CONF})") + + def set_evasive_state(self, enable: bool) -> StepResult: + """Enable or disable the mod_evasive Apache module. + + mod_evasive has no detection-only mode, so we toggle the module + itself: enabled when WAF is in blocking mode, disabled when in + DetectionOnly mode. + """ + if not package_installed(EVASIVE_PACKAGE): + return StepResult( + skipped=True, + message="mod_evasive not installed, skipping", + ) + + currently_enabled = file_exists(EVASIVE_LOAD) + if enable and currently_enabled: + return StepResult(skipped=True, message="mod_evasive already enabled") + if not enable and not currently_enabled: + return StepResult(skipped=True, message="mod_evasive already disabled") + + if self.dry_run: + action = "enable" if enable else "disable" + return StepResult(message=f"Would {action} mod_evasive") + + cmd = ["a2enmod", "evasive"] if enable else ["a2dismod", "evasive"] + _, stderr, rc = run_cmd(cmd) + if rc != 0: + action = "enable" if enable else "disable" + return StepResult( + success=False, + error=f"Failed to {action} mod_evasive: {stderr}", + ) + + action = "Enabled" if enable else "Disabled" + return StepResult(message=f"{action} mod_evasive") + def install_crs_v4(self) -> StepResult: """Install OWASP CRS v4, downloading from GitHub if apt has v3.""" pf = self._preflight or self.preflight() @@ -382,7 +453,7 @@ def reload_apache(self) -> StepResult: def _rollback(self) -> None: """Restore .bak.nssec backups for all managed config files.""" - for path in [MODSEC_CONF, SECURITY2_CONF, NS_EXCLUSIONS_CONF]: + for path in [MODSEC_CONF, SECURITY2_CONF, NS_EXCLUSIONS_CONF, EVASIVE_CONF]: bak = path + BACKUP_SUFFIX if file_exists(bak): shutil.copy2(bak, path) @@ -434,10 +505,13 @@ def run(self, admin_ips: list[str] | None = None) -> InstallResult: if pf.apache_installed and not pf.apache_running: result.warnings.append("Apache2 is installed but not running") + evasive_enable = self.mode == "On" steps = [ ("Install packages", self.install_packages), ("Enable Apache modules", self.enable_modules), ("Configure ModSecurity", self.setup_config), + ("Configure mod_evasive", self.setup_evasive_config), + ("Set mod_evasive state", lambda: self.set_evasive_state(evasive_enable)), ("Install OWASP CRS v4", self.install_crs_v4), ("Install NS exclusions", lambda: self.install_exclusions(admin_ips)), ("Update security2.conf", self.write_security2_conf), @@ -462,7 +536,12 @@ def run(self, admin_ips: list[str] | None = None) -> InstallResult: # ------------------------------------------------------------------ def set_mode(self, mode: str) -> StepResult: - """Change SecRuleEngine mode in the live config.""" + """Change SecRuleEngine mode and toggle mod_evasive accordingly. + + When switching to 'On' (blocking), mod_evasive is enabled. + When switching to 'DetectionOnly', mod_evasive is disabled so it + does not block traffic while the WAF is still being tuned. + """ content = read_file(MODSEC_CONF) if not content: return StepResult(success=False, error=f"{MODSEC_CONF} not found") @@ -486,6 +565,10 @@ def set_mode(self, mode: str) -> StepResult: if not write_file(MODSEC_CONF, "\n".join(new_lines) + "\n"): return StepResult(success=False, error=f"Failed to write {MODSEC_CONF}") + # Toggle mod_evasive: enabled in blocking mode, disabled in detect mode + evasive_enable = mode == "On" + evasive_result = self.set_evasive_state(evasive_enable) + stdout, stderr, rc = run_cmd(["apache2ctl", "configtest"]) if rc != 0: self._rollback() @@ -495,5 +578,15 @@ def set_mode(self, mode: str) -> StepResult: _, stderr, rc = run_cmd(["systemctl", "reload", "apache2"]) if rc != 0: return StepResult(success=False, error=f"Apache reload failed: {stderr}") - msg = f"SecRuleEngine set to {mode} and Apache reloaded" + + evasive_state = "enabled" if evasive_enable else "disabled" + evasive_note = "" + if evasive_result.skipped: + evasive_note = f" (mod_evasive: {evasive_result.message})" + elif evasive_result.success: + evasive_note = f" (mod_evasive {evasive_state})" + else: + evasive_note = f" (warning: mod_evasive toggle failed: {evasive_result.error})" + + msg = f"SecRuleEngine set to {mode} and Apache reloaded{evasive_note}" return StepResult(message=msg) diff --git a/src/nssec/modules/waf/config.py b/src/nssec/modules/waf/config.py index 3954595..25a2c30 100644 --- a/src/nssec/modules/waf/config.py +++ b/src/nssec/modules/waf/config.py @@ -4,6 +4,9 @@ MODSEC_PACKAGE = "libapache2-mod-security2" CRS_APT_PACKAGE = "modsecurity-crs" EVASIVE_PACKAGE = "libapache2-mod-evasive" +EVASIVE_CONF = "/etc/apache2/mods-available/evasive.conf" +EVASIVE_LOAD = "/etc/apache2/mods-enabled/evasive.load" +EVASIVE_LOG_DIR = "/var/log/apache2/mod_evasive" # CRS version pinning (used when apt ships v3.x) PINNED_CRS_VERSION = "4.8.0" @@ -298,3 +301,59 @@ t:none,\\ setvar:'tx.allowed_request_content_type=|application/x-www-form-urlencoded| |multipart/form-data| |multipart/related| |multipart/mixed| |text/xml| |application/xml| |application/soap+xml| |application/json| |application/cloudevents+json| |application/cloudevents-batch+json|'" """ + +EVASIVE_CONF_TEMPLATE = """\ +# mod_evasive Configuration +# Managed by nssec +# Generated: {{ timestamp }} +# +# HTTP flood / DDoS protection for Apache. +# Thresholds tuned for NetSapiens traffic patterns (~270 req/s sustained, +# peak ~318 req/s across hundreds of IPs). + + + # Hash table size — prime number with headroom above expected unique IPs + DOSHashTableSize 3097 + + # Max requests to the same page per interval before blocking + # Tightened from default (2) — scanner patterns warrant 15/s per-page + DOSPageCount 15 + + # Max total requests from one IP per interval before blocking + # Peak burst is ~318 req/s total across ~372 IPs; 60/s per-IP is generous + DOSSiteCount 60 + + # Sliding window intervals (seconds) + DOSPageInterval 1 + DOSSiteInterval 1 + + # How long (seconds) an IP is blocked once a threshold is hit + # 60s block breaks automated scanner loops without permanent impact + DOSBlockingPeriod 60 + + # Log blocked IPs here + DOSLogDir {{ log_dir }} + + # Whitelist RFC 1918 private ranges and loopback to avoid false positives + # on internal NS service traffic and cluster communication + DOSWhitelist 127.0.0.1 + DOSWhitelist 10.*.*.* + DOSWhitelist 172.16.*.* + DOSWhitelist 172.17.*.* + DOSWhitelist 172.18.*.* + DOSWhitelist 172.19.*.* + DOSWhitelist 172.20.*.* + DOSWhitelist 172.21.*.* + DOSWhitelist 172.22.*.* + DOSWhitelist 172.23.*.* + DOSWhitelist 172.24.*.* + DOSWhitelist 172.25.*.* + DOSWhitelist 172.26.*.* + DOSWhitelist 172.27.*.* + DOSWhitelist 172.28.*.* + DOSWhitelist 172.29.*.* + DOSWhitelist 172.30.*.* + DOSWhitelist 172.31.*.* + DOSWhitelist 192.168.*.* + +""" diff --git a/src/nssec/modules/waf/status.py b/src/nssec/modules/waf/status.py index d7fd777..22d089d 100644 --- a/src/nssec/modules/waf/status.py +++ b/src/nssec/modules/waf/status.py @@ -8,6 +8,8 @@ from nssec.modules.waf.config import ( CRS_SEARCH_PATHS, + EVASIVE_LOAD, + EVASIVE_PACKAGE, MODSEC_AUDIT_LOG, MODSEC_CONF, MODSEC_PACKAGE, @@ -26,6 +28,8 @@ class WafStatus: crs_installed: bool = False crs_version: Optional[str] = None crs_path: Optional[str] = None + evasive_installed: bool = False + evasive_enabled: bool = False exclusions_present: bool = False audit_log_exists: bool = False recent_log_lines: list[str] = field(default_factory=list) @@ -91,6 +95,9 @@ def get_waf_status() -> WafStatus: status.crs_version = version_file.read_text().strip() break + status.evasive_installed = _pkg_installed(EVASIVE_PACKAGE) + status.evasive_enabled = Path(EVASIVE_LOAD).exists() + status.exclusions_present = Path(NS_EXCLUSIONS_CONF).exists() status.audit_log_exists = Path(MODSEC_AUDIT_LOG).exists() if status.audit_log_exists: diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5c64131 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,58 @@ +"""Shared pytest fixtures for nssec tests.""" + +import pytest +from unittest.mock import MagicMock, patch + + +@pytest.fixture +def mock_run_cmd(): + """Mock run_cmd to avoid executing real system commands.""" + with patch("nssec.modules.waf.utils.run_cmd") as mock: + mock.return_value = ("", "", 0) + yield mock + + +@pytest.fixture +def mock_file_ops(): + """Mock file operations for WAF module. + + Patches at the point of use (nssec.modules.waf) not definition (utils). + """ + with patch("nssec.modules.waf.file_exists") as exists, \ + patch("nssec.modules.waf.read_file") as read, \ + patch("nssec.modules.waf.write_file") as write, \ + patch("nssec.modules.waf.backup_file") as backup, \ + patch("nssec.modules.waf.render") as render_mock: + exists.return_value = True + read.return_value = "" + write.return_value = True + backup.return_value = True + render_mock.return_value = "rendered content" + yield { + "exists": exists, + "read": read, + "write": write, + "backup": backup, + "render": render_mock, + } + + +@pytest.fixture +def mock_preflight(): + """Mock preflight result for CLI tests.""" + from nssec.modules.waf.types import PreflightResult + + pf = PreflightResult() + pf.is_root = True + pf.apache_installed = True + pf.modsec_installed = True + pf.modsec_enabled = True + pf.modsec_mode = "On" + return pf + + +@pytest.fixture +def cli_runner(): + """Click CLI test runner.""" + from click.testing import CliRunner + return CliRunner() diff --git a/tests/unit/test_mtls_module.py b/tests/unit/test_mtls_module.py new file mode 100644 index 0000000..b5aa73d --- /dev/null +++ b/tests/unit/test_mtls_module.py @@ -0,0 +1,165 @@ +"""Tests for mTLS module functions.""" + +from nssec.modules.mtls.config import NODEPING_BEGIN_MARKER, NODEPING_END_MARKER +from nssec.modules.mtls.utils import ( + build_managed_section, + find_requireany_block, + get_managed_section, + parse_ip_list, +) + + +class TestParseIpList: + """Tests for parse_ip_list function.""" + + def test_parses_ipv4_addresses(self): + content = "192.168.1.1\n10.0.0.1\n172.16.0.1" + result = parse_ip_list(content) + assert result == ["192.168.1.1", "10.0.0.1", "172.16.0.1"] + + def test_parses_ipv6_addresses(self): + content = "2001:db8::1\n::1\nfe80::1" + result = parse_ip_list(content) + assert len(result) == 3 + assert "2001:db8::1" in result + assert "::1" in result + + def test_skips_comments_and_empty_lines(self): + content = "# Comment\n\n192.168.1.1\n\n# Another comment\n10.0.0.1" + result = parse_ip_list(content) + assert result == ["192.168.1.1", "10.0.0.1"] + + def test_skips_invalid_ips(self): + content = "192.168.1.1\nnot-an-ip\n10.0.0.1" + result = parse_ip_list(content) + assert result == ["192.168.1.1", "10.0.0.1"] + + def test_handles_whitespace(self): + content = " 192.168.1.1 \n\t10.0.0.1\t" + result = parse_ip_list(content) + assert result == ["192.168.1.1", "10.0.0.1"] + + def test_empty_content(self): + result = parse_ip_list("") + assert result == [] + + def test_parses_hostname_ip_format(self): + """NodePing uses 'hostname IP' format.""" + content = "pinghostca.nodeping.com 104.247.192.170\npinghostaz.nodeping.com 38.114.123.177" + result = parse_ip_list(content) + assert result == ["104.247.192.170", "38.114.123.177"] + + def test_parses_hostname_ipv6_format(self): + content = "pinghostca.nodeping.com 2607:3f00:11:21::10" + result = parse_ip_list(content) + assert result == ["2607:3f00:11:21::10"] + + +class TestGetManagedSection: + """Tests for get_managed_section function.""" + + def test_finds_existing_section(self): + content = f""" + + Require ip 127.0.0.1 + {NODEPING_BEGIN_MARKER} + Require ip 1.2.3.4 + Require ip 5.6.7.8 + {NODEPING_END_MARKER} + +""" + start, end, ips = get_managed_section(content) + assert start > 0 + assert end > start + assert ips == ["1.2.3.4", "5.6.7.8"] + + def test_returns_empty_when_no_section(self): + content = """ + + Require ip 127.0.0.1 + +""" + start, end, ips = get_managed_section(content) + assert start == -1 + assert end == -1 + assert ips == [] + + def test_handles_missing_end_marker(self): + content = f""" + + {NODEPING_BEGIN_MARKER} + Require ip 1.2.3.4 + +""" + start, end, ips = get_managed_section(content) + assert start == -1 + assert end == -1 + assert ips == [] + + +class TestBuildManagedSection: + """Tests for build_managed_section function.""" + + def test_builds_section_with_markers(self): + ips = ["1.2.3.4", "5.6.7.8"] + result = build_managed_section(ips) + + assert NODEPING_BEGIN_MARKER in result + assert NODEPING_END_MARKER in result + assert "Require ip 1.2.3.4" in result + assert "Require ip 5.6.7.8" in result + + def test_includes_metadata(self): + ips = ["1.2.3.4"] + result = build_managed_section(ips) + + assert "Updated:" in result + assert "Source:" in result + assert "Count: 1 IPs" in result + + def test_sorts_ips(self): + ips = ["5.6.7.8", "1.2.3.4", "9.10.11.12"] + result = build_managed_section(ips) + + # Find positions of IPs in output + pos_1 = result.find("1.2.3.4") + pos_5 = result.find("5.6.7.8") + pos_9 = result.find("9.10.11.12") + + assert pos_1 < pos_5 < pos_9 + + +class TestFindRequireanyBlock: + """Tests for find_requireany_block function.""" + + def test_finds_requireany_in_location(self): + content = """ + + SSLVerifyClient require + + Require ip 127.0.0.1 + + +""" + pos = find_requireany_block(content) + assert pos > 0 + # Position should be right after \n + assert content[pos : pos + 8] == " " # indentation of next line + + def test_returns_negative_when_no_location(self): + content = """ + + Require ip 127.0.0.1 + +""" + pos = find_requireany_block(content) + assert pos == -1 + + def test_returns_negative_when_no_requireany(self): + content = """ + + Require ip 127.0.0.1 + +""" + pos = find_requireany_block(content) + assert pos == -1 diff --git a/tests/unit/test_waf_commands.py b/tests/unit/test_waf_commands.py new file mode 100644 index 0000000..2f8e70f --- /dev/null +++ b/tests/unit/test_waf_commands.py @@ -0,0 +1,219 @@ +"""Tests for WAF CLI commands.""" + +import pytest +from unittest.mock import patch, MagicMock +from click.testing import CliRunner + +from nssec.cli.waf_commands import waf + + +@pytest.fixture +def runner(): + """Click CLI test runner.""" + return CliRunner() + + +@pytest.fixture +def mock_installer(): + """Mock ModSecurityInstaller for CLI tests.""" + with patch("nssec.modules.waf.ModSecurityInstaller") as mock_class: + installer = MagicMock() + mock_class.return_value = installer + + # Default preflight result + pf = MagicMock() + pf.is_root = True + pf.modsec_installed = True + pf.modsec_enabled = True + pf.modsec_mode = "On" + installer.preflight.return_value = pf + + # Default step result + step = MagicMock() + step.success = True + step.message = "Success" + installer.set_mode.return_value = step + installer.validate_config.return_value = step + installer.reload_apache.return_value = step + + yield installer + + +class TestWafDisable: + """Tests for waf disable command.""" + + def test_switches_to_detectiononly(self, runner, mock_installer): + """Should switch to DetectionOnly mode.""" + result = runner.invoke(waf, ["disable", "-y"]) + + assert result.exit_code == 0 + mock_installer.set_mode.assert_called_once_with("DetectionOnly") + + def test_skips_if_already_detectiononly(self, runner, mock_installer): + """Should skip if already in DetectionOnly mode.""" + mock_installer.preflight.return_value.modsec_mode = "DetectionOnly" + + result = runner.invoke(waf, ["disable", "-y"]) + + assert result.exit_code == 0 + assert "already in DetectionOnly" in result.output + mock_installer.set_mode.assert_not_called() + + def test_requires_root(self, runner, mock_installer): + """Should fail if not root.""" + mock_installer.preflight.return_value.is_root = False + + result = runner.invoke(waf, ["disable", "-y"]) + + assert result.exit_code == 1 + assert "root" in result.output.lower() + + def test_requires_modsec_installed(self, runner, mock_installer): + """Should fail if ModSecurity not installed.""" + mock_installer.preflight.return_value.modsec_installed = False + + result = runner.invoke(waf, ["disable", "-y"]) + + assert result.exit_code == 1 + assert "not installed" in result.output.lower() + + def test_prompts_without_yes_flag(self, runner, mock_installer): + """Should prompt for confirmation without -y flag.""" + result = runner.invoke(waf, ["disable"], input="n\n") + + assert "Aborted" in result.output + mock_installer.set_mode.assert_not_called() + + +class TestWafRemove: + """Tests for waf remove command.""" + + def test_disables_security2_module(self, runner): + """Should disable security2 Apache module.""" + with patch("nssec.core.ssh.is_root", return_value=True), \ + patch("nssec.modules.waf.utils.file_exists", return_value=True), \ + patch("nssec.modules.waf.utils.run_cmd") as mock_run: + mock_run.return_value = ("", "", 0) + + result = runner.invoke(waf, ["remove", "-y"]) + + assert result.exit_code == 0 + # Check a2dismod was called + calls = [str(c) for c in mock_run.call_args_list] + assert any("a2dismod" in c for c in calls) + + def test_skips_if_already_disabled(self, runner): + """Should skip if module already disabled.""" + with patch("nssec.core.ssh.is_root", return_value=True), \ + patch("nssec.modules.waf.utils.file_exists", return_value=False): + result = runner.invoke(waf, ["remove", "-y"]) + + assert result.exit_code == 0 + assert "already disabled" in result.output + + def test_requires_root(self, runner): + """Should fail if not root.""" + with patch("nssec.core.ssh.is_root", return_value=False): + result = runner.invoke(waf, ["remove", "-y"]) + + assert result.exit_code == 1 + assert "root" in result.output.lower() + + def test_prompts_without_yes_flag(self, runner): + """Should prompt for confirmation without -y flag.""" + with patch("nssec.core.ssh.is_root", return_value=True), \ + patch("nssec.modules.waf.utils.file_exists", return_value=True): + result = runner.invoke(waf, ["remove"], input="n\n") + + assert "Aborted" in result.output + + +class TestWafAllowlistAdd: + """Tests for waf allowlist add command.""" + + def test_adds_ip_to_allowlist(self, runner, mock_installer): + """Should add IP to allowlist.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=[]), \ + patch("nssec.modules.waf.add_allowlisted_ip") as mock_add: + mock_add.return_value = MagicMock(success=True, message="Added") + + result = runner.invoke(waf, ["allowlist", "add", "192.168.1.100", "-y"]) + + assert result.exit_code == 0 + mock_add.assert_called_once_with("192.168.1.100") + + def test_skips_duplicate_ip(self, runner, mock_installer): + """Should skip if IP already allowlisted.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100"]): + result = runner.invoke(waf, ["allowlist", "add", "192.168.1.100", "-y"]) + + assert result.exit_code == 0 + assert "already allowlisted" in result.output + + def test_requires_root(self, runner, mock_installer): + """Should fail if not root.""" + mock_installer.preflight.return_value.is_root = False + + result = runner.invoke(waf, ["allowlist", "add", "192.168.1.100", "-y"]) + + assert result.exit_code == 1 + + +class TestWafAllowlistDelete: + """Tests for waf allowlist delete command.""" + + def test_removes_ip_from_allowlist(self, runner, mock_installer): + """Should remove IP from allowlist.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100"]), \ + patch("nssec.modules.waf.remove_allowlisted_ip") as mock_remove: + mock_remove.return_value = MagicMock(success=True, message="Removed") + + result = runner.invoke(waf, ["allowlist", "delete", "192.168.1.100", "-y"]) + + assert result.exit_code == 0 + mock_remove.assert_called_once_with("192.168.1.100") + + def test_skips_nonexistent_ip(self, runner, mock_installer): + """Should skip if IP not in allowlist.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=[]): + result = runner.invoke(waf, ["allowlist", "delete", "192.168.1.100", "-y"]) + + assert result.exit_code == 0 + assert "not in the allowlist" in result.output + + def test_requires_root(self, runner, mock_installer): + """Should fail if not root.""" + mock_installer.preflight.return_value.is_root = False + + result = runner.invoke(waf, ["allowlist", "delete", "192.168.1.100", "-y"]) + + assert result.exit_code == 1 + + +class TestWafAllowlistShow: + """Tests for waf allowlist show command.""" + + def test_shows_allowlisted_ips(self, runner): + """Should display allowlisted IPs.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100", "10.0.0.0/8"]): + result = runner.invoke(waf, ["allowlist", "show"]) + + assert result.exit_code == 0 + assert "192.168.1.100" in result.output + assert "10.0.0.0/8" in result.output + + def test_shows_empty_message(self, runner): + """Should show message when no IPs allowlisted.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=[]): + result = runner.invoke(waf, ["allowlist", "show"]) + + assert result.exit_code == 0 + assert "No IPs" in result.output + + def test_default_subcommand_shows_list(self, runner): + """Running 'waf allowlist' without subcommand should show list.""" + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100"]): + result = runner.invoke(waf, ["allowlist"]) + + assert result.exit_code == 0 + assert "192.168.1.100" in result.output diff --git a/tests/unit/test_waf_module.py b/tests/unit/test_waf_module.py new file mode 100644 index 0000000..b67d400 --- /dev/null +++ b/tests/unit/test_waf_module.py @@ -0,0 +1,398 @@ +"""Tests for WAF module functions.""" + +import pytest +from unittest.mock import patch, MagicMock, call +from jinja2 import Template + + +class TestGetAllowlistedIps: + """Tests for get_allowlisted_ips function.""" + + def test_returns_empty_list_when_no_config(self, mock_file_ops): + """Should return empty list when exclusions config doesn't exist.""" + from nssec.modules.waf import get_allowlisted_ips + + mock_file_ops["read"].return_value = None + result = get_allowlisted_ips() + assert result == [] + + def test_returns_empty_list_when_no_ips(self, mock_file_ops): + """Should return empty list when no IPs in config.""" + from nssec.modules.waf import get_allowlisted_ips + + mock_file_ops["read"].return_value = """ +# Some config without allowlisted IPs +SecRule REQUEST_URI "@beginsWith /cfg/" "id:1000004" +""" + result = get_allowlisted_ips() + assert result == [] + + def test_parses_single_ip(self, mock_file_ops): + """Should parse a single allowlisted IP.""" + from nssec.modules.waf import get_allowlisted_ips + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" "id:1000101,phase:1,pass" +""" + result = get_allowlisted_ips() + assert result == ["192.168.1.100"] + + def test_parses_multiple_ips(self, mock_file_ops): + """Should parse multiple allowlisted IPs.""" + from nssec.modules.waf import get_allowlisted_ips + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" "id:1000101,phase:1,pass" +SecRule REMOTE_ADDR "@ipMatch 10.0.0.0/8" "id:1000102,phase:1,pass" +SecRule REMOTE_ADDR "@ipMatch 74.219.23.50" "id:1000103,phase:1,pass" +""" + result = get_allowlisted_ips() + assert result == ["192.168.1.100", "10.0.0.0/8", "74.219.23.50"] + + def test_parses_cidr_notation(self, mock_file_ops): + """Should correctly parse CIDR notation IPs.""" + from nssec.modules.waf import get_allowlisted_ips + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 10.0.0.0/8" "id:1000101,phase:1,pass" +""" + result = get_allowlisted_ips() + assert result == ["10.0.0.0/8"] + + +class TestAddAllowlistedIp: + """Tests for add_allowlisted_ip function.""" + + def test_adds_new_ip(self, mock_file_ops): + """Should add a new IP to the allowlist.""" + from nssec.modules.waf import add_allowlisted_ip + + mock_file_ops["read"].return_value = "" + result = add_allowlisted_ip("192.168.1.100") + + assert result.success + assert "192.168.1.100" in result.message + mock_file_ops["write"].assert_called_once() + + def test_skips_duplicate_ip(self, mock_file_ops): + """Should skip if IP already in allowlist.""" + from nssec.modules.waf import add_allowlisted_ip + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" "id:1000101,phase:1,pass" +""" + result = add_allowlisted_ip("192.168.1.100") + + assert result.skipped + mock_file_ops["write"].assert_not_called() + + def test_creates_backup_before_write(self, mock_file_ops): + """Should backup existing config before writing.""" + from nssec.modules.waf import add_allowlisted_ip + + mock_file_ops["read"].return_value = "" + mock_file_ops["exists"].return_value = True + add_allowlisted_ip("192.168.1.100") + + mock_file_ops["backup"].assert_called_once() + + def test_returns_error_on_write_failure(self, mock_file_ops): + """Should return error if write fails.""" + from nssec.modules.waf import add_allowlisted_ip + + mock_file_ops["read"].return_value = "" + mock_file_ops["write"].return_value = False + result = add_allowlisted_ip("192.168.1.100") + + assert not result.success + assert "Failed to write" in result.error + + +class TestRemoveAllowlistedIp: + """Tests for remove_allowlisted_ip function.""" + + def test_removes_existing_ip(self, mock_file_ops): + """Should remove an existing IP from allowlist.""" + from nssec.modules.waf import remove_allowlisted_ip + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" "id:1000101,phase:1,pass" +SecRule REMOTE_ADDR "@ipMatch 10.0.0.1" "id:1000102,phase:1,pass" +""" + result = remove_allowlisted_ip("192.168.1.100") + + assert result.success + assert "192.168.1.100" in result.message + mock_file_ops["write"].assert_called_once() + + def test_skips_nonexistent_ip(self, mock_file_ops): + """Should skip if IP not in allowlist.""" + from nssec.modules.waf import remove_allowlisted_ip + + mock_file_ops["read"].return_value = "" + result = remove_allowlisted_ip("192.168.1.100") + + assert result.skipped + mock_file_ops["write"].assert_not_called() + + def test_creates_backup_before_write(self, mock_file_ops): + """Should backup existing config before writing.""" + from nssec.modules.waf import remove_allowlisted_ip + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" "id:1000101,phase:1,pass" +""" + mock_file_ops["exists"].return_value = True + remove_allowlisted_ip("192.168.1.100") + + mock_file_ops["backup"].assert_called_once() + + def test_returns_error_on_write_failure(self, mock_file_ops): + """Should return error if write fails.""" + from nssec.modules.waf import remove_allowlisted_ip + + mock_file_ops["read"].return_value = """ +SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" "id:1000101,phase:1,pass" +""" + mock_file_ops["write"].return_value = False + result = remove_allowlisted_ip("192.168.1.100") + + assert not result.success + assert "Failed to write" in result.error + + +class TestEvasiveConfTemplate: + """Tests for the evasive.conf Jinja2 template.""" + + def test_template_renders_with_required_directives(self): + """Should render a valid evasive.conf with all key directives.""" + from nssec.modules.waf.config import EVASIVE_CONF_TEMPLATE + + rendered = Template(EVASIVE_CONF_TEMPLATE).render( + timestamp="2026-01-01 00:00 UTC", + log_dir="/var/log/apache2/mod_evasive", + ) + assert "DOSHashTableSize" in rendered + assert "DOSPageCount" in rendered + assert "DOSSiteCount" in rendered + assert "DOSBlockingPeriod" in rendered + assert "DOSWhitelist" in rendered + assert "127.0.0.1" in rendered + assert "/var/log/apache2/mod_evasive" in rendered + + def test_template_whitelists_rfc1918(self): + """Should whitelist all RFC 1918 private ranges.""" + from nssec.modules.waf.config import EVASIVE_CONF_TEMPLATE + + rendered = Template(EVASIVE_CONF_TEMPLATE).render( + timestamp="test", log_dir="/tmp", + ) + assert "10.*.*.*" in rendered + assert "172.16.*.*" in rendered + assert "172.31.*.*" in rendered + assert "192.168.*.*" in rendered + + def test_template_has_tuned_thresholds(self): + """Thresholds should be tuned for NetSapiens traffic patterns.""" + from nssec.modules.waf.config import EVASIVE_CONF_TEMPLATE + + rendered = Template(EVASIVE_CONF_TEMPLATE).render( + timestamp="test", + log_dir="/tmp", + ) + # DOSPageCount should be > 2 (default is too aggressive) + assert "DOSPageCount 15" in rendered + # DOSSiteCount tuned for ~318 peak req/s across ~372 IPs + assert "DOSSiteCount 60" in rendered + # Extended blocking period for active scanners + assert "DOSBlockingPeriod 60" in rendered + + +class TestSetupEvasiveConfig: + """Tests for ModSecurityInstaller.setup_evasive_config.""" + + def test_writes_evasive_config(self, mock_file_ops): + """Should write evasive config with tuned thresholds.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.Path"): + installer = ModSecurityInstaller(mode="On") + result = installer.setup_evasive_config() + + assert result.success + assert "evasive" in result.message.lower() + mock_file_ops["write"].assert_called_once() + + def test_skips_when_evasive_disabled(self, mock_file_ops): + """Should skip when install_evasive is False.""" + from nssec.modules.waf import ModSecurityInstaller + + installer = ModSecurityInstaller(install_evasive=False) + result = installer.setup_evasive_config() + + assert result.skipped + mock_file_ops["write"].assert_not_called() + + def test_dry_run_does_not_write(self, mock_file_ops): + """Should not write in dry run mode.""" + from nssec.modules.waf import ModSecurityInstaller + + installer = ModSecurityInstaller(dry_run=True) + result = installer.setup_evasive_config() + + assert result.success + assert "Would write" in result.message + mock_file_ops["write"].assert_not_called() + + def test_backs_up_existing_config(self, mock_file_ops): + """Should backup existing evasive config before writing.""" + from nssec.modules.waf import ModSecurityInstaller + + mock_file_ops["exists"].return_value = True + with patch("nssec.modules.waf.Path"): + installer = ModSecurityInstaller(mode="On") + installer.setup_evasive_config() + + mock_file_ops["backup"].assert_called() + + def test_returns_error_on_write_failure(self, mock_file_ops): + """Should return error if write fails.""" + from nssec.modules.waf import ModSecurityInstaller + + mock_file_ops["write"].return_value = False + installer = ModSecurityInstaller(mode="On") + result = installer.setup_evasive_config() + + assert not result.success + assert "Failed to write" in result.error + + +class TestSetEvasiveState: + """Tests for ModSecurityInstaller.set_evasive_state.""" + + def test_enables_evasive_module(self, mock_file_ops): + """Should run a2enmod evasive when enabling.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=True), \ + patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + mock_file_ops["exists"].return_value = False # not currently enabled + installer = ModSecurityInstaller() + result = installer.set_evasive_state(enable=True) + + assert result.success + assert "Enabled" in result.message + mock_run.assert_called_once_with(["a2enmod", "evasive"]) + + def test_disables_evasive_module(self, mock_file_ops): + """Should run a2dismod evasive when disabling.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=True), \ + patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + mock_file_ops["exists"].return_value = True # currently enabled + installer = ModSecurityInstaller() + result = installer.set_evasive_state(enable=False) + + assert result.success + assert "Disabled" in result.message + mock_run.assert_called_once_with(["a2dismod", "evasive"]) + + def test_skips_if_already_enabled(self, mock_file_ops): + """Should skip if evasive already enabled and requesting enable.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=True): + mock_file_ops["exists"].return_value = True # already enabled + installer = ModSecurityInstaller() + result = installer.set_evasive_state(enable=True) + + assert result.skipped + assert "already enabled" in result.message + + def test_skips_if_already_disabled(self, mock_file_ops): + """Should skip if evasive already disabled and requesting disable.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=True): + mock_file_ops["exists"].return_value = False # already disabled + installer = ModSecurityInstaller() + result = installer.set_evasive_state(enable=False) + + assert result.skipped + assert "already disabled" in result.message + + def test_skips_if_not_installed(self, mock_file_ops): + """Should skip if mod_evasive package not installed.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=False): + installer = ModSecurityInstaller() + result = installer.set_evasive_state(enable=True) + + assert result.skipped + assert "not installed" in result.message + + def test_dry_run_does_not_change_state(self, mock_file_ops): + """Should not run commands in dry run mode.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=True), \ + patch("nssec.modules.waf.run_cmd") as mock_run: + mock_file_ops["exists"].return_value = False + installer = ModSecurityInstaller(dry_run=True) + result = installer.set_evasive_state(enable=True) + + assert result.success + assert "Would enable" in result.message + mock_run.assert_not_called() + + def test_returns_error_on_command_failure(self, mock_file_ops): + """Should return error if a2enmod/a2dismod fails.""" + from nssec.modules.waf import ModSecurityInstaller + + with patch("nssec.modules.waf.package_installed", return_value=True), \ + patch("nssec.modules.waf.run_cmd", return_value=("", "error", 1)): + mock_file_ops["exists"].return_value = False + installer = ModSecurityInstaller() + result = installer.set_evasive_state(enable=True) + + assert not result.success + assert "Failed to enable" in result.error + + +class TestSetModeEvasiveIntegration: + """Tests for set_mode toggling mod_evasive alongside ModSecurity.""" + + def test_enable_mode_enables_evasive(self, mock_file_ops): + """Switching to On mode should enable mod_evasive.""" + from nssec.modules.waf import ModSecurityInstaller + + mock_file_ops["read"].return_value = "SecRuleEngine DetectionOnly\n" + with patch("nssec.modules.waf.package_installed", return_value=True), \ + patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + mock_file_ops["exists"].return_value = False # evasive not enabled + installer = ModSecurityInstaller() + result = installer.set_mode("On") + + assert result.success + # Should have called a2enmod evasive + run_calls = [str(c) for c in mock_run.call_args_list] + assert any("a2enmod" in c and "evasive" in c for c in run_calls) + + def test_detect_mode_disables_evasive(self, mock_file_ops): + """Switching to DetectionOnly should disable mod_evasive.""" + from nssec.modules.waf import ModSecurityInstaller + + mock_file_ops["read"].return_value = "SecRuleEngine On\n" + with patch("nssec.modules.waf.package_installed", return_value=True), \ + patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + mock_file_ops["exists"].return_value = True # evasive currently enabled + installer = ModSecurityInstaller() + result = installer.set_mode("DetectionOnly") + + assert result.success + # Should have called a2dismod evasive + run_calls = [str(c) for c in mock_run.call_args_list] + assert any("a2dismod" in c and "evasive" in c for c in run_calls) + assert "mod_evasive disabled" in result.message