From 30d6b5b2e50ea5569c3609987d6b300281100451 Mon Sep 17 00:00:00 2001 From: Sumner Robinson Date: Fri, 20 Feb 2026 16:41:44 -0500 Subject: [PATCH 1/3] updated for Python 3.8 support --- .github/workflows/release.yml | 4 +- README.md | 2 + pyproject.toml | 9 ++-- src/nssec/cli/__init__.py | 2 + src/nssec/cli/audit.py | 32 +----------- src/nssec/cli/main.py | 74 ++++++++++++++++++-------- src/nssec/core/cache.py | 11 +++- src/nssec/core/checklist.py | 2 + src/nssec/core/checks.py | 2 + src/nssec/core/config.py | 2 + src/nssec/core/server_types.py | 2 + src/nssec/core/ssh.py | 87 +++++++++++++++++++++++++++---- src/nssec/modules/waf/__init__.py | 20 +++---- src/nssec/modules/waf/status.py | 2 + src/nssec/modules/waf/types.py | 2 + src/nssec/modules/waf/utils.py | 34 +++++------- 16 files changed, 190 insertions(+), 97 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 64e35dc..ad63731 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ permissions: jobs: test: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 @@ -28,7 +28,7 @@ jobs: build: needs: test - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 diff --git a/README.md b/README.md index 4639acf..8426918 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,8 @@ These community projects provide additional NetSapiens security capabilities: - **[rekeyandsync](https://github.com/kselkowitz/rekeyandsync)** — Rekey and resync SIP device credentials +- **[ua-monitor](https://github.com/traviscw/ua-monitor)** — SIP Registration Monitor for VoIPMonitor + ## Roadmap - [x] ModSecurity installation and configuration with OWASP CRS diff --git a/pyproject.toml b/pyproject.toml index b4228b1..499fda4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ version = "0.1.0" description = "Open-source NetSapiens security platform - audit tools and hardening automation" readme = "README.md" license = "Apache-2.0" -requires-python = ">=3.9" +requires-python = ">=3.8" authors = [ { name = "Sumner Robinson" } ] @@ -20,6 +20,7 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", "Operating System :: POSIX :: Linux", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -63,11 +64,11 @@ include = [ [tool.black] line-length = 100 -target-version = ["py39"] +target-version = ["py38"] [tool.ruff] line-length = 100 -target-version = "py39" +target-version = "py38" [tool.ruff.lint] select = ["E", "F", "I", "N", "W", "UP"] @@ -76,7 +77,7 @@ select = ["E", "F", "I", "N", "W", "UP"] "src/nssec/modules/waf/config.py" = ["E501"] [tool.mypy] -python_version = "3.9" +python_version = "3.8" warn_return_any = true warn_unused_configs = true diff --git a/src/nssec/cli/__init__.py b/src/nssec/cli/__init__.py index 0fecacc..a3e51fa 100644 --- a/src/nssec/cli/__init__.py +++ b/src/nssec/cli/__init__.py @@ -3,6 +3,8 @@ Shared utilities used by CLI sub-modules (audit, waf_commands, etc.). """ +from __future__ import annotations + from pathlib import Path from typing import Optional diff --git a/src/nssec/cli/audit.py b/src/nssec/cli/audit.py index 241d7c1..45528ac 100644 --- a/src/nssec/cli/audit.py +++ b/src/nssec/cli/audit.py @@ -132,22 +132,6 @@ def _display_audit_summary(failed, warnings, passed, skipped, results): ) -def _connect_remote_host(host): - """Set up SSH connection to a remote host for auditing.""" - from nssec.core.cache import session_cache - from nssec.core.ssh import SSHExecutor, set_remote_host - - console.print(f"[bold]Connecting to {host}...[/bold]") - executor = SSHExecutor(host) - success, message = executor.test_connection() - if not success: - console.print(f"[red]SSH connection failed: {message}[/red]") - raise SystemExit(1) - console.print(f"[green]{message}[/green]\n") - set_remote_host(host) - session_cache.clear() - - def _filter_checks(applicable, category, checks, skip): """Apply category, include, and exclude filters to check list.""" if category: @@ -161,11 +145,6 @@ def _filter_checks(applicable, category, checks, skip): @audit.command("run") -@click.option( - "--host", - "-H", - help="Remote host to audit via SSH (e.g., user@hostname)", -) @click.option( "--checks", "-c", @@ -184,17 +163,10 @@ def _filter_checks(applicable, category, checks, skip): type=click.Choice(["apiban", "firewall", "ssh", "mysql", "netsapiens"]), help="Run only checks in category", ) -def audit_run(host, checks, skip, verbose, category): - """Run a full security audit. - - Use --host to audit a remote server via SSH: - nssec audit run --host ubuntu@myserver.example.com - """ +def audit_run(checks, skip, verbose, category): + """Run a full security audit.""" from nssec.core.checks import get_checks_for_server_type - if host: - _connect_remote_host(host) - server_type = detect_server_type() console.print(f"[bold]Security Audit - {server_type.value.upper()} Server[/bold]\n") diff --git a/src/nssec/cli/main.py b/src/nssec/cli/main.py index bae3876..8fbe68a 100644 --- a/src/nssec/cli/main.py +++ b/src/nssec/cli/main.py @@ -14,15 +14,66 @@ ) +def _setup_remote_host(host: str, use_sudo: bool = False) -> None: + """Set up SSH connection to a remote host.""" + from nssec.core.cache import session_cache + from nssec.core.ssh import SSHExecutor, set_remote_host, set_use_sudo + + console.print(f"[bold]Connecting to {host}...[/bold]") + executor = SSHExecutor(host) + success, message = executor.test_connection() + if not success: + console.print(f"[red]SSH connection failed: {message}[/red]") + raise SystemExit(1) + console.print(f"[green]{message}[/green]") + set_remote_host(host) + set_use_sudo(use_sudo) + if use_sudo: + console.print("[dim]Sudo enabled - you may be prompted for password[/dim]") + console.print() + session_cache.clear() + + @click.group() @click.version_option(version=__version__, prog_name="nssec") +@click.option( + "--host", + "-H", + envvar="NSSEC_HOST", + help="Remote host to connect via SSH (e.g., user@hostname). Can also set NSSEC_HOST env var.", +) +@click.option( + "--sudo", + "-S", + is_flag=True, + envvar="NSSEC_SUDO", + help="Run commands with sudo (for privileged operations like waf init).", +) @click.pass_context -def cli(ctx): +def cli(ctx, host, sudo): """NS-Security: Open-source NetSapiens security platform. Audit tools and hardening automation for NetSapiens clusters. + + Use --host to run commands on a remote NetSapiens server via SSH: + + nssec --host ubuntu@myserver.example.com audit run + + Use --sudo for commands that require root privileges: + + nssec --host ubuntu@myserver.example.com --sudo waf init """ ctx.ensure_object(dict) + ctx.obj["host"] = host + ctx.obj["sudo"] = sudo + + # Set up sudo for local execution if no host specified + if sudo and not host: + from nssec.core.ssh import set_use_sudo + set_use_sudo(True) + + if host: + _setup_remote_host(host, use_sudo=sudo) # ─── SERVER COMMANDS ─── @@ -68,27 +119,8 @@ def _print_detection_results(info): @server.command("detect") -@click.option( - "--host", - "-H", - help="Remote host to detect via SSH (e.g., user@hostname)", -) -def server_detect(host): +def server_detect(): """Detect the NetSapiens server type.""" - from nssec.core.cache import session_cache - from nssec.core.ssh import SSHExecutor, set_remote_host - - if host: - console.print(f"[bold]Connecting to {host}...[/bold]") - executor = SSHExecutor(host) - success, message = executor.test_connection() - if not success: - console.print(f"[red]SSH connection failed: {message}[/red]") - raise SystemExit(1) - console.print(f"[green]{message}[/green]\n") - set_remote_host(host) - session_cache.clear() - info = get_server_info() console.print(f"\n[bold cyan]Server Type:[/bold cyan] {info['server_type'].upper()}") diff --git a/src/nssec/core/cache.py b/src/nssec/core/cache.py index 61b988c..da19ff6 100644 --- a/src/nssec/core/cache.py +++ b/src/nssec/core/cache.py @@ -17,6 +17,8 @@ session_cache.clear() """ +from __future__ import annotations + import threading import time from pathlib import Path @@ -25,6 +27,11 @@ from nssec.core import ssh +def _remove_suffix(text: str, suffix: str) -> str: + """Remove suffix from string (Python 3.8 compatible).""" + return text[:-len(suffix)] if suffix and text.endswith(suffix) else text + + def _run_subprocess(cmd: list[str], timeout: int = 30) -> tuple[str, int]: """Run a subprocess command safely (locally or via SSH). @@ -70,7 +77,7 @@ def _parse_service_line(line: str) -> tuple[Optional[str], Optional[str]]: if not parts: return None, None service_unit = parts[0] - service_name = service_unit.removesuffix(".service") + service_name = _remove_suffix(service_unit, ".service") return service_name, service_unit @@ -273,7 +280,7 @@ def cached_service_active(self, service_name: str) -> bool: if self._active_services is None or self._is_expired(self._services_time): self._load_services_cache() - normalized = service_name.removesuffix(".service") + normalized = _remove_suffix(service_name, ".service") return ( normalized in self._active_services or f"{normalized}.service" in self._active_services diff --git a/src/nssec/core/checklist.py b/src/nssec/core/checklist.py index 5907f04..da6fe51 100644 --- a/src/nssec/core/checklist.py +++ b/src/nssec/core/checklist.py @@ -1,5 +1,7 @@ """Security audit checklist engine.""" +from __future__ import annotations + from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum diff --git a/src/nssec/core/checks.py b/src/nssec/core/checks.py index 291997c..469a9c1 100644 --- a/src/nssec/core/checks.py +++ b/src/nssec/core/checks.py @@ -4,6 +4,8 @@ Where possible, checks read config files directly rather than running commands. """ +from __future__ import annotations + from pathlib import Path from nssec.core.cache import cached_ufw_rules diff --git a/src/nssec/core/config.py b/src/nssec/core/config.py index 759d072..56046de 100644 --- a/src/nssec/core/config.py +++ b/src/nssec/core/config.py @@ -1,5 +1,7 @@ """YAML configuration management for nssec.""" +from __future__ import annotations + import os import re from pathlib import Path diff --git a/src/nssec/core/server_types.py b/src/nssec/core/server_types.py index 7762fe9..8e423a7 100644 --- a/src/nssec/core/server_types.py +++ b/src/nssec/core/server_types.py @@ -1,5 +1,7 @@ """NetSapiens server type detection via systemd services and packages.""" +from __future__ import annotations + from dataclasses import dataclass from enum import Enum from typing import Optional diff --git a/src/nssec/core/ssh.py b/src/nssec/core/ssh.py index 64f3db1..91291bf 100644 --- a/src/nssec/core/ssh.py +++ b/src/nssec/core/ssh.py @@ -9,10 +9,16 @@ # Set the remote host for all subsequent operations set_remote_host("ubuntu@development-core") - # Now all checks will run via SSH + # Enable sudo for privileged commands + set_use_sudo(True) + + # Now all checks will run via SSH (with sudo) nssec audit run """ +from __future__ import annotations + +import os import subprocess from pathlib import Path from typing import Optional @@ -20,6 +26,9 @@ # Global remote host - when set, all commands execute via SSH _remote_host: Optional[str] = None +# Global sudo flag - when set, commands are prefixed with sudo +_use_sudo: bool = False + def set_remote_host(host: Optional[str]) -> None: """Set the remote host for SSH execution. @@ -37,23 +46,66 @@ def get_remote_host() -> Optional[str]: return _remote_host +def set_use_sudo(use_sudo: bool) -> None: + """Enable or disable sudo for command execution. + + Args: + use_sudo: If True, commands will be prefixed with sudo. + """ + global _use_sudo + _use_sudo = use_sudo + + +def get_use_sudo() -> bool: + """Check if sudo is enabled for command execution.""" + return _use_sudo + + def is_remote() -> bool: """Check if we're configured to run remotely.""" return _remote_host is not None +def is_root() -> bool: + """Check if we're running as root (locally or remotely). + + For remote execution, checks the effective user on the remote host. + For local execution, checks the local effective user ID. + + Returns: + True if running as root (or sudo is enabled), False otherwise. + """ + if _use_sudo: + return True + + if is_remote(): + # Check remote effective user ID + stdout, _, rc = run_command(["id", "-u"]) + if rc == 0: + try: + return int(stdout.strip()) == 0 + except ValueError: + pass + return False + + # Local check + return os.geteuid() == 0 + + class SSHExecutor: """Execute commands and read files over SSH.""" - def __init__(self, host: str, timeout: int = 30): + def __init__(self, host: str, timeout: int = 30, use_sudo: bool = False): """Initialize SSH executor. Args: host: SSH host string (e.g., "user@hostname"). timeout: Default timeout for SSH commands in seconds. + use_sudo: If True, prefix commands with sudo. """ self.host = host self.timeout = timeout + self.use_sudo = use_sudo # SSH options for non-interactive, reliable execution self.ssh_opts = [ "-o", @@ -68,12 +120,14 @@ def run_command( self, cmd: list[str], timeout: Optional[int] = None, + use_sudo: Optional[bool] = None, ) -> tuple[str, str, int]: """Run a command on the remote host via SSH. Args: cmd: Command and arguments to run. timeout: Timeout in seconds (uses default if not specified). + use_sudo: Override sudo setting for this command. If None, uses instance default. Returns: Tuple of (stdout, stderr, return_code). @@ -81,10 +135,21 @@ def run_command( if timeout is None: timeout = self.timeout + # Determine if we should use sudo + sudo = use_sudo if use_sudo is not None else self.use_sudo + # Build SSH command # Quote the remote command properly - remote_cmd = " ".join(_shell_quote(arg) for arg in cmd) - ssh_cmd = ["ssh"] + self.ssh_opts + [self.host, remote_cmd] + if sudo: + remote_cmd = "sudo " + " ".join(_shell_quote(arg) for arg in cmd) + else: + remote_cmd = " ".join(_shell_quote(arg) for arg in cmd) + + # Use -t to allocate TTY when sudo is needed (allows password prompt) + ssh_opts = self.ssh_opts.copy() + if sudo: + ssh_opts = ["-t"] + ssh_opts + ssh_cmd = ["ssh"] + ssh_opts + [self.host, remote_cmd] try: result = subprocess.run( @@ -173,9 +238,11 @@ def _shell_quote(s: str) -> str: def get_executor() -> Optional[SSHExecutor]: """Get the global SSH executor, or None if running locally.""" global _executor - if _remote_host and _executor is None: - _executor = SSHExecutor(_remote_host) - elif not _remote_host: + if _remote_host: + # Recreate executor if sudo setting changed + if _executor is None or _executor.use_sudo != _use_sudo: + _executor = SSHExecutor(_remote_host, use_sudo=_use_sudo) + else: _executor = None return _executor @@ -185,6 +252,7 @@ def run_command(cmd: list[str], timeout: int = 30) -> tuple[str, str, int]: This is a drop-in replacement for subprocess-based command execution that transparently handles SSH when a remote host is configured. + Respects the global sudo setting for both local and remote execution. Args: cmd: Command and arguments to run. @@ -198,9 +266,10 @@ def run_command(cmd: list[str], timeout: int = 30) -> tuple[str, str, int]: return executor.run_command(cmd, timeout) # Local execution + actual_cmd = ["sudo"] + cmd if _use_sudo else cmd try: result = subprocess.run( - cmd, + actual_cmd, capture_output=True, text=True, timeout=timeout, @@ -209,7 +278,7 @@ def run_command(cmd: list[str], timeout: int = 30) -> tuple[str, str, int]: except subprocess.TimeoutExpired: return "", f"Command timed out after {timeout}s", -1 except FileNotFoundError: - return "", f"Command not found: {cmd[0]}", -1 + return "", f"Command not found: {actual_cmd[0]}", -1 except Exception as e: return "", str(e), -1 diff --git a/src/nssec/modules/waf/__init__.py b/src/nssec/modules/waf/__init__.py index 932cd07..f4bed10 100644 --- a/src/nssec/modules/waf/__init__.py +++ b/src/nssec/modules/waf/__init__.py @@ -6,11 +6,12 @@ from __future__ import annotations -import os import re import shutil from pathlib import Path +from nssec.core.ssh import is_directory, is_root + from nssec.modules.waf.config import ( BACKUP_SUFFIX, CRS_APT_PACKAGE, @@ -80,9 +81,9 @@ def preflight(self) -> PreflightResult: """Run all preflight checks and return results.""" pf = PreflightResult() - pf.is_root = os.geteuid() == 0 + pf.is_root = is_root() if not pf.is_root: - pf.errors.append("Must run as root (sudo nssec waf init)") + pf.errors.append("Must run as root (use --sudo flag or run with sudo)") pf.apache_installed = package_installed("apache2") if not pf.apache_installed: @@ -107,14 +108,15 @@ def preflight(self) -> PreflightResult: return pf def _detect_crs(self) -> tuple[bool, str | None, str | None]: - """Detect CRS installation and version.""" + """Detect CRS installation and version. SSH-aware.""" for search_path in CRS_SEARCH_PATHS: - if not Path(search_path).is_dir(): + if not is_directory(search_path): continue - version_file = Path(search_path) / "VERSION" - if version_file.exists(): - return True, version_file.read_text().strip(), search_path - if (Path(search_path) / "rules").is_dir(): + version_file = f"{search_path}/VERSION" + version_content = read_file(version_file) + if version_content: + return True, version_content.strip(), search_path + if is_directory(f"{search_path}/rules"): return True, None, search_path if package_installed(CRS_APT_PACKAGE): diff --git a/src/nssec/modules/waf/status.py b/src/nssec/modules/waf/status.py index f83420d..ddc9bd6 100644 --- a/src/nssec/modules/waf/status.py +++ b/src/nssec/modules/waf/status.py @@ -1,5 +1,7 @@ """WAF status reporting.""" +from __future__ import annotations + from dataclasses import dataclass, field from pathlib import Path from typing import Optional diff --git a/src/nssec/modules/waf/types.py b/src/nssec/modules/waf/types.py index b66c6fb..05a3739 100644 --- a/src/nssec/modules/waf/types.py +++ b/src/nssec/modules/waf/types.py @@ -1,5 +1,7 @@ """Data types for the WAF module.""" +from __future__ import annotations + from dataclasses import dataclass, field from typing import Optional diff --git a/src/nssec/modules/waf/utils.py b/src/nssec/modules/waf/utils.py index 58e3f86..7fdf72a 100644 --- a/src/nssec/modules/waf/utils.py +++ b/src/nssec/modules/waf/utils.py @@ -3,46 +3,40 @@ from __future__ import annotations import shutil -import subprocess from datetime import datetime, timezone from pathlib import Path from jinja2 import Template +from nssec.core import ssh from nssec.modules.waf.config import BACKUP_SUFFIX, SECURITY2_CONF def run_cmd(cmd: list[str], timeout: int = 120) -> tuple[str, str, int]: - """Run a command and return (stdout, stderr, returncode).""" - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=timeout, - ) - return result.stdout, result.stderr, result.returncode - except subprocess.TimeoutExpired: - return "", f"Command timed out after {timeout}s", -1 - except FileNotFoundError: - return "", f"Command not found: {cmd[0]}", -1 + """Run a command and return (stdout, stderr, returncode). + + Uses SSH-aware execution - works locally or remotely. + """ + return ssh.run_command(cmd, timeout) def package_installed(package: str) -> bool: - """Check if a deb package is installed.""" + """Check if a deb package is installed. + + Uses SSH-aware execution - works locally or remotely. + """ _, _, rc = run_cmd(["dpkg", "-s", package]) return rc == 0 def file_exists(path: str) -> bool: - return Path(path).exists() + """Check if a file exists. SSH-aware.""" + return ssh.file_exists(path) def read_file(path: str) -> str | None: - try: - return Path(path).read_text() - except (OSError, PermissionError): - return None + """Read a file. SSH-aware.""" + return ssh.read_file(path) def backup_file(path: str) -> str | None: From bcc5607daf442288e4acf9087a8dd4d382a00b9f Mon Sep 17 00:00:00 2001 From: Sumner Robinson Date: Fri, 20 Feb 2026 22:18:36 -0500 Subject: [PATCH 2/3] update ip whitelist --- src/nssec/cli/waf_commands.py | 97 ++++++++++++++++++++++++++++++- src/nssec/modules/waf/__init__.py | 36 ++++++++++++ src/nssec/modules/waf/config.py | 12 +++- 3 files changed, 143 insertions(+), 2 deletions(-) diff --git a/src/nssec/cli/waf_commands.py b/src/nssec/cli/waf_commands.py index 6ca546c..fedcfd0 100644 --- a/src/nssec/cli/waf_commands.py +++ b/src/nssec/cli/waf_commands.py @@ -292,7 +292,25 @@ def waf_status(): console.print(f" [dim]{line}[/dim]") -@waf.command("allowlist") +@waf.group("allowlist", invoke_without_command=True) +@click.pass_context +def waf_allowlist(ctx): + """Manage allowlisted IPs for reduced WAF strictness.""" + if ctx.invoked_subcommand is None: + # Default behavior: show allowlist + from nssec.modules.waf import get_allowlisted_ips + + ips = get_allowlisted_ips() + if not ips: + console.print("[dim]No IPs currently allowlisted.[/dim]") + return + + console.print(f"[bold]Allowlisted IPs[/bold] ({len(ips)})\n") + for ip in ips: + console.print(f" {ip}") + + +@waf_allowlist.command("show") def waf_allowlist_show(): """Show current allowlisted IPs.""" from nssec.modules.waf import get_allowlisted_ips @@ -305,3 +323,80 @@ def waf_allowlist_show(): console.print(f"[bold]Allowlisted IPs[/bold] ({len(ips)})\n") for ip in ips: console.print(f" {ip}") + + +@waf_allowlist.command("add") +@click.argument("ip") +@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") +def waf_allowlist_add(ip, yes): + """Add an IP address to the WAF allowlist. + + IP can be a single address (192.168.1.1) or CIDR notation (10.0.0.0/8). + Allowlisted IPs bypass OWASP CRS rules for reduced false positives. + """ + from nssec.modules.waf import ModSecurityInstaller, get_allowlisted_ips, add_allowlisted_ip + + installer = ModSecurityInstaller() + pf = installer.preflight() + _require_root_and_modsec(pf, "sudo nssec waf allowlist add") + + current_ips = get_allowlisted_ips() + if ip in current_ips: + console.print(f"[yellow]IP {ip} is already allowlisted.[/yellow]") + return + + console.print(f"Adding [cyan]{ip}[/cyan] to WAF allowlist...") + + result = add_allowlisted_ip(ip) + if not result.success: + console.print(f" [red]Error:[/red] {result.error}") + raise SystemExit(1) + console.print(f" [green]Done:[/green] {result.message}") + + val = installer.validate_config() + if not val.success: + console.print(f" [red]Error:[/red] {val.error}") + raise SystemExit(1) + console.print(f" [green]Done:[/green] {val.message}") + + _prompt_and_reload_apache(installer, yes) + + +@waf_allowlist.command("delete") +@click.argument("ip") +@click.option("--yes", "-y", is_flag=True, help="Skip confirmation") +def waf_allowlist_delete(ip, yes): + """Remove an IP address from the WAF allowlist. + + IP must match exactly as it was added (including CIDR notation if used). + """ + from nssec.modules.waf import ModSecurityInstaller, get_allowlisted_ips, remove_allowlisted_ip + + installer = ModSecurityInstaller() + pf = installer.preflight() + _require_root_and_modsec(pf, "sudo nssec waf allowlist delete") + + current_ips = get_allowlisted_ips() + if ip not in current_ips: + console.print(f"[yellow]IP {ip} is not in the allowlist.[/yellow]") + if current_ips: + console.print("\nCurrent allowlisted IPs:") + for existing_ip in current_ips: + console.print(f" {existing_ip}") + return + + console.print(f"Removing [cyan]{ip}[/cyan] from WAF allowlist...") + + result = remove_allowlisted_ip(ip) + if not result.success: + console.print(f" [red]Error:[/red] {result.error}") + raise SystemExit(1) + console.print(f" [green]Done:[/green] {result.message}") + + val = installer.validate_config() + if not val.success: + console.print(f" [red]Error:[/red] {val.error}") + raise SystemExit(1) + console.print(f" [green]Done:[/green] {val.message}") + + _prompt_and_reload_apache(installer, yes) diff --git a/src/nssec/modules/waf/__init__.py b/src/nssec/modules/waf/__init__.py index f4bed10..b62e1b8 100644 --- a/src/nssec/modules/waf/__init__.py +++ b/src/nssec/modules/waf/__init__.py @@ -59,6 +59,42 @@ def get_allowlisted_ips() -> list[str]: return re.findall(r'id:10001\d+.*?@ipMatch\s+([^\s"]+)', content, re.DOTALL) +def add_allowlisted_ip(ip: str) -> StepResult: + """Add an IP address to the allowlist and regenerate exclusions config.""" + current_ips = get_allowlisted_ips() + if ip in current_ips: + return StepResult(skipped=True, message=f"{ip} already allowlisted") + + new_ips = current_ips + [ip] + + if file_exists(NS_EXCLUSIONS_CONF): + backup_file(NS_EXCLUSIONS_CONF) + + content = render(NS_EXCLUSIONS_TEMPLATE, admin_ips=new_ips) + if not write_file(NS_EXCLUSIONS_CONF, content): + return StepResult(success=False, error=f"Failed to write {NS_EXCLUSIONS_CONF}") + + return StepResult(message=f"Added {ip} to allowlist") + + +def remove_allowlisted_ip(ip: str) -> StepResult: + """Remove an IP address from the allowlist and regenerate exclusions config.""" + current_ips = get_allowlisted_ips() + if ip not in current_ips: + return StepResult(skipped=True, message=f"{ip} not in allowlist") + + new_ips = [existing for existing in current_ips if existing != ip] + + if file_exists(NS_EXCLUSIONS_CONF): + backup_file(NS_EXCLUSIONS_CONF) + + content = render(NS_EXCLUSIONS_TEMPLATE, admin_ips=new_ips) + if not write_file(NS_EXCLUSIONS_CONF, content): + return StepResult(success=False, error=f"Failed to write {NS_EXCLUSIONS_CONF}") + + return StepResult(message=f"Removed {ip} from allowlist") + + class ModSecurityInstaller: """Idempotent ModSecurity v2 + OWASP CRS v4 installer for Apache2.""" diff --git a/src/nssec/modules/waf/config.py b/src/nssec/modules/waf/config.py index 43a0e13..befdf81 100644 --- a/src/nssec/modules/waf/config.py +++ b/src/nssec/modules/waf/config.py @@ -193,9 +193,19 @@ nolog,\\ ctl:ruleRemoveById=920420" +# ---- Phone provisioning config files (.cfg, .xml) ---- +# Phones constantly fetch config files from /cfg/ - this is expected NDP behavior. +# Rule 920440 blocks .cfg extension by policy; exclude the entire /cfg/ path. +SecRule REQUEST_URI "@beginsWith /cfg/" \\ + "id:1000004,\\ + phase:1,\\ + pass,\\ + nolog,\\ + ctl:ruleRemoveById=920440" + # ---- iNSight health checks ---- SecRule REQUEST_URI "@beginsWith /cfg/insight_healthcheck" \\ - "id:1000004,\\ + "id:1000006,\\ phase:1,\\ pass,\\ nolog,\\ From 83e383800ba4b2c35a13a906f1c041aea4073849 Mon Sep 17 00:00:00 2001 From: Sumner Robinson Date: Fri, 20 Feb 2026 22:22:02 -0500 Subject: [PATCH 3/3] update ip whitelist --- src/nssec/modules/waf/status.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/nssec/modules/waf/status.py b/src/nssec/modules/waf/status.py index ddc9bd6..d7fd777 100644 --- a/src/nssec/modules/waf/status.py +++ b/src/nssec/modules/waf/status.py @@ -56,7 +56,9 @@ def _read_file(path: str) -> Optional[str]: def _tail_file(path: str, lines: int = 10) -> list[str]: """Return the last N lines of a file.""" try: - all_lines = Path(path).read_text().splitlines() + # Audit log may contain binary request bodies; use replace to handle them + content = Path(path).read_text(errors="replace") + all_lines = content.splitlines() return all_lines[-lines:] except (OSError, PermissionError): return []