diff --git a/src/nssec/cli/waf_commands.py b/src/nssec/cli/waf_commands.py
index e9d87f7..c1f05ef 100644
--- a/src/nssec/cli/waf_commands.py
+++ b/src/nssec/cli/waf_commands.py
@@ -116,7 +116,29 @@ def _build_status_table(status):
table.add_column("Property", style="cyan")
table.add_column("Value")
- table.add_row("ModSecurity installed", _yn(status.modsec_installed))
+ if status.apache_version:
+ apache_val = f"v{status.apache_version}"
+ if status.apache_ppa:
+ apache_val += " [cyan](ondrej PPA)[/cyan]"
+ table.add_row("Apache", apache_val)
+
+ if status.modsec_installed and status.modsec_version:
+ from nssec.modules.waf.utils import version_gte
+
+ ver_str = f"v{status.modsec_version}"
+ if not version_gte(status.modsec_version, "2.9.6") and status.disabled_crs_rules > 0:
+ table.add_row(
+ "ModSecurity installed",
+ f"[yellow]yes ({ver_str} — {status.disabled_crs_rules} CRS rule(s) disabled, "
+ f"run [cyan]nssec waf update[/cyan] to upgrade)[/yellow]",
+ )
+ else:
+ table.add_row(
+ "ModSecurity installed",
+ f"[green]yes ({ver_str})[/green]",
+ )
+ else:
+ table.add_row("ModSecurity installed", _yn(status.modsec_installed))
table.add_row("Module enabled", _yn(status.modsec_enabled))
if status.modsec_mode:
@@ -130,6 +152,10 @@ def _build_status_table(status):
table.add_row("OWASP CRS", f"[green]{crs}[/green]")
if status.crs_path:
table.add_row("CRS path", status.crs_path)
+ setup_val = _yn(status.crs_setup_present)
+ if not status.crs_setup_present:
+ setup_val += " [red](rule 901001 will flag all traffic! run [cyan]nssec waf init[/cyan])[/red]"
+ table.add_row("crs-setup.conf", setup_val)
else:
table.add_row("OWASP CRS", "[red]not installed[/red]")
@@ -139,7 +165,35 @@ def _build_status_table(status):
else:
table.add_row("mod_evasive", "[dim]not installed[/dim]")
- table.add_row("NS exclusions", _yn(status.exclusions_present, "yellow"))
+ # NS exclusions detail
+ if status.exclusions_present:
+ if not status.exclusions_included:
+ excl_val = (
+ "[red]not loaded[/red] — security2.conf does not include exclusions file, "
+ "run [cyan]nssec waf init[/cyan] to fix"
+ )
+ elif not status.crs_path_valid:
+ excl_val = (
+ "[yellow]loaded but ineffective[/yellow] — "
+ "CRS misconfigured (missing crs-setup.conf), "
+ "run [cyan]nssec waf init[/cyan] to fix"
+ )
+ elif not status.exclusions_current:
+ v = status.exclusions_version or "unknown"
+ excl_val = (
+ f"[yellow]outdated (v{v})[/yellow] — "
+ "run [cyan]nssec waf update-exclusions[/cyan]"
+ )
+ else:
+ excl_val = "[green]active (v{})[/green]".format(status.exclusions_version)
+ table.add_row("NS exclusions", excl_val)
+ if status.exclusions_admin_ips:
+ table.add_row(" Admin IPs", str(status.exclusions_admin_ips))
+ if status.exclusions_nodeping_ips:
+ table.add_row(" NodePing IPs", str(status.exclusions_nodeping_ips))
+ else:
+ table.add_row("NS exclusions", "[yellow]not deployed[/yellow]")
+
table.add_row("Audit log", _yn(status.audit_log_exists, "dim"))
return table
@@ -194,8 +248,17 @@ def waf_init(mode, skip_evasive, yes, dry_run):
console.print("[yellow]Aborted.[/yellow]")
return
+ # Fetch NodePing monitoring probe IPs for WAF allowlisting
+ from nssec.modules.waf import fetch_nodeping_probe_ips
+
+ nodeping_ips, np_err = fetch_nodeping_probe_ips()
+ if np_err:
+ console.print(f" [yellow]Warning:[/yellow] {np_err}")
+ elif nodeping_ips:
+ console.print(f" Fetched {len(nodeping_ips)} NodePing probe IPs for WAF allowlisting")
+
console.print()
- result = installer.run()
+ result = installer.run(nodeping_ips=nodeping_ips)
_print_install_results(result)
if not result.success:
@@ -348,7 +411,16 @@ def waf_update_exclusions(yes, dry_run):
console.print("[bold]Updating NetSapiens WAF exclusions...[/bold]")
- result = installer.install_exclusions()
+ # Fetch NodePing monitoring probe IPs for WAF allowlisting
+ from nssec.modules.waf import fetch_nodeping_probe_ips
+
+ nodeping_ips, np_err = fetch_nodeping_probe_ips()
+ if np_err:
+ console.print(f" [yellow]Warning:[/yellow] {np_err}")
+ elif nodeping_ips:
+ console.print(f" Fetched {len(nodeping_ips)} NodePing probe IPs for WAF allowlisting")
+
+ result = installer.install_exclusions(nodeping_ips=nodeping_ips)
if not result.success:
console.print(f" [red]Error:[/red] {result.error}")
raise SystemExit(1)
@@ -383,6 +455,102 @@ def waf_status():
console.print(f" [dim]{line}[/dim]")
+@waf.command("update")
+@click.option("--yes", "-y", is_flag=True, help="Skip confirmation prompts")
+def waf_update(yes):
+ """Check ModSecurity version and re-enable CRS rules after upgrade.
+
+ \b
+ If ModSecurity < 2.9.6, shows instructions for adding the Digitalwave
+ ModSecurity repository to get a compatible version.
+
+ \b
+ If ModSecurity >= 2.9.6 (user already upgraded), re-enables any CRS
+ rules that were disabled during init and validates the Apache config.
+ """
+ from nssec.modules.waf import ModSecurityInstaller
+ from nssec.modules.waf.utils import detect_modsec_version, version_gte
+ from nssec.modules.waf.config import (
+ CRS_INSTALL_DIR,
+ DIGITALWAVE_KEY_URL,
+ DIGITALWAVE_KEYRING,
+ DIGITALWAVE_LIST,
+ DIGITALWAVE_REPO_URL,
+ )
+
+ installer = ModSecurityInstaller()
+ pf = installer.preflight()
+ _require_root_and_modsec(pf, "sudo nssec waf update")
+
+ current_ver = detect_modsec_version()
+ console.print(f"[bold]Current ModSecurity version:[/bold] {current_ver or 'unknown'}")
+
+ if not version_gte(current_ver, "2.9.6"):
+ console.print()
+ console.print(
+ "[yellow]ModSecurity < 2.9.6 — some CRS v4 rules are disabled.[/yellow]"
+ )
+ console.print(
+ "Ubuntu 22.04 ships ModSecurity 2.9.5 which lacks support for "
+ "multipart rules introduced in 2.9.6."
+ )
+ console.print()
+ console.print("[bold]To upgrade, add the Digitalwave ModSecurity repository:[/bold]")
+ console.print()
+ keyring = DIGITALWAVE_KEYRING
+ console.print(
+ f" [cyan]curl -fsSL {DIGITALWAVE_KEY_URL} "
+ f"| sudo gpg --dearmor -o {keyring}[/cyan]"
+ )
+ console.print()
+ # Escape square brackets so Rich doesn't treat [signed-by=...] as markup
+ signed = f"\\[signed-by={keyring}]"
+ repo = DIGITALWAVE_REPO_URL
+ lst = DIGITALWAVE_LIST
+ console.print(
+ f' [cyan]echo "deb {signed} {repo} $(lsb_release -sc) main" '
+ f"| sudo tee {lst}[/cyan]"
+ )
+ console.print(
+ f' [cyan]echo "deb {signed} {repo} $(lsb_release -sc)-backports main" '
+ f"| sudo tee -a {lst}[/cyan]"
+ )
+ console.print()
+ console.print(
+ " [cyan]sudo apt-get update[/cyan]"
+ )
+ console.print(
+ " [cyan]sudo apt-get install -t $(lsb_release -sc)-backports "
+ "libapache2-mod-security2[/cyan]"
+ )
+ console.print()
+ console.print(
+ "After upgrading, run [cyan]nssec waf update[/cyan] again to "
+ "re-enable the disabled CRS rules."
+ )
+ return
+
+ # ModSec >= 2.9.6 — re-enable any disabled rules
+ crs_path = pf.crs_path or CRS_INSTALL_DIR
+ reenabled = installer._reenable_crs_rules(crs_path)
+ if not reenabled:
+ console.print("[green]ModSecurity >= 2.9.6 and all CRS rules are active. Nothing to do.[/green]")
+ return
+
+ console.print(
+ f" [green]Done:[/green] Re-enabled {len(reenabled)} CRS rule(s): "
+ + ", ".join(reenabled)
+ )
+
+ val = installer.validate_config()
+ if not val.success:
+ console.print(f" [red]Error:[/red] {val.error}")
+ raise SystemExit(1)
+ console.print(f" [green]Done:[/green] {val.message}")
+
+ _prompt_and_reload_apache(installer, yes)
+
+
@waf.group("allowlist", invoke_without_command=True)
@click.pass_context
def waf_allowlist(ctx):
diff --git a/src/nssec/modules/waf/__init__.py b/src/nssec/modules/waf/__init__.py
index 352bb52..f228af2 100644
--- a/src/nssec/modules/waf/__init__.py
+++ b/src/nssec/modules/waf/__init__.py
@@ -17,6 +17,7 @@
CRS_APT_PACKAGE,
CRS_GITHUB_DOWNLOAD,
CRS_INSTALL_DIR,
+ CRS_RULES_REQUIRE_296,
CRS_SEARCH_PATHS,
CRS_SETUP_OVERRIDES_TEMPLATE,
EVASIVE_CONF,
@@ -36,7 +37,9 @@
MODSEC_PACKAGE,
MODSEC_TMP_DIR,
NS_EXCLUSIONS_CONF,
+ NS_EXCLUSIONS_HASH,
NS_EXCLUSIONS_TEMPLATE,
+ NS_EXCLUSIONS_VERSION,
PINNED_CRS_VERSION,
SECURITY2_CONF,
SECURITY2_CONF_TEMPLATE,
@@ -47,17 +50,26 @@
append_crs_to_security2,
backup_file,
detect_modsec_mode,
+ detect_modsec_version,
file_exists,
package_installed,
parse_security2_conf,
read_file,
render,
run_cmd,
+ version_gte,
write_file,
write_security2_full,
)
+def fetch_nodeping_probe_ips() -> tuple[list[str], str]:
+ """Fetch NodePing monitoring probe IPs for WAF allowlisting."""
+ from nssec.modules.mtls.utils import fetch_nodeping_ips
+
+ return fetch_nodeping_ips()
+
+
def get_allowlisted_ips() -> list[str]:
"""Parse allowlisted admin IPs from the deployed exclusions conf."""
content = read_file(NS_EXCLUSIONS_CONF)
@@ -82,7 +94,12 @@ def add_allowlisted_ip(ip: str) -> StepResult:
if file_exists(NS_EXCLUSIONS_CONF):
backup_file(NS_EXCLUSIONS_CONF)
- content = render(NS_EXCLUSIONS_TEMPLATE, admin_ips=new_ips)
+ content = render(
+ NS_EXCLUSIONS_TEMPLATE,
+ admin_ips=new_ips,
+ version=NS_EXCLUSIONS_VERSION,
+ template_hash=NS_EXCLUSIONS_HASH,
+ )
if not write_file(NS_EXCLUSIONS_CONF, content):
return StepResult(success=False, error=f"Failed to write {NS_EXCLUSIONS_CONF}")
@@ -100,7 +117,12 @@ def remove_allowlisted_ip(ip: str) -> StepResult:
if file_exists(NS_EXCLUSIONS_CONF):
backup_file(NS_EXCLUSIONS_CONF)
- content = render(NS_EXCLUSIONS_TEMPLATE, admin_ips=new_ips)
+ content = render(
+ NS_EXCLUSIONS_TEMPLATE,
+ admin_ips=new_ips,
+ version=NS_EXCLUSIONS_VERSION,
+ template_hash=NS_EXCLUSIONS_HASH,
+ )
if not write_file(NS_EXCLUSIONS_CONF, content):
return StepResult(success=False, error=f"Failed to write {NS_EXCLUSIONS_CONF}")
@@ -317,7 +339,12 @@ def install_crs_v4(self) -> StepResult:
has_v4 = pf.crs_installed and pf.crs_version and pf.crs_version.startswith("4")
if has_v4:
- msg = f"CRS v{pf.crs_version} already installed at {pf.crs_path}"
+ # Still update crs-setup.conf with latest template values
+ self._update_crs_setup(pf.crs_path)
+ disabled = self._disable_incompatible_crs_rules(pf.crs_path)
+ msg = f"CRS v{pf.crs_version} at {pf.crs_path} (crs-setup.conf updated)"
+ if disabled:
+ msg += f"; disabled {len(disabled)} rule(s) incompatible with ModSec < 2.9.6"
return StepResult(skipped=True, message=msg)
if self.dry_run:
@@ -351,6 +378,39 @@ def _try_crs_from_apt(self) -> StepResult | None:
return StepResult(message=msg)
return None
+ def _update_crs_setup(self, crs_path: str) -> bool:
+ """Write crs-setup.conf from template. Returns True on success."""
+ setup_content = render(
+ CRS_SETUP_OVERRIDES_TEMPLATE,
+ paranoia_level=1,
+ inbound_threshold=5,
+ outbound_threshold=4,
+ )
+ setup_path = f"{crs_path}/crs-setup.conf"
+ return write_file(setup_path, setup_content)
+
+ def _disable_incompatible_crs_rules(self, crs_path: str) -> list[str]:
+ """Disable CRS rules incompatible with the installed ModSecurity version.
+
+ On ModSecurity < 2.9.6, renames rule files in CRS_RULES_REQUIRE_296
+ from .conf to .conf.disabled to prevent Apache startup failures.
+
+ Returns list of disabled filenames (empty if >= 2.9.6 or nothing to do).
+ """
+ ver = detect_modsec_version()
+ if version_gte(ver, "2.9.6"):
+ return []
+
+ disabled: list[str] = []
+ rules_dir = Path(f"{crs_path}/rules")
+ for rule_file in CRS_RULES_REQUIRE_296:
+ src = rules_dir / rule_file
+ dst = rules_dir / (rule_file + ".disabled")
+ if src.exists() and not dst.exists():
+ src.rename(dst)
+ disabled.append(rule_file)
+ return disabled
+
def _download_crs_from_github(self) -> StepResult:
"""Download and extract CRS v4 from GitHub releases."""
tarball = f"/tmp/crs-v{PINNED_CRS_VERSION}.tar.gz"
@@ -377,21 +437,24 @@ def _download_crs_from_github(self) -> StepResult:
return StepResult(success=False, error=f"Failed to extract CRS: {stderr}")
Path(tarball).unlink(missing_ok=True)
- setup_content = render(
- CRS_SETUP_OVERRIDES_TEMPLATE,
- paranoia_level=1,
- inbound_threshold=5,
- outbound_threshold=4,
- )
- setup_path = f"{CRS_INSTALL_DIR}/crs-setup.conf"
- if not write_file(setup_path, setup_content):
- return StepResult(success=False, error=f"Failed to write {setup_path}")
+ if not self._update_crs_setup(CRS_INSTALL_DIR):
+ return StepResult(
+ success=False, error=f"Failed to write {CRS_INSTALL_DIR}/crs-setup.conf"
+ )
+
+ # Refresh preflight cache so write_security2_conf() uses the new CRS path
+ self._preflight = None
+
+ disabled = self._disable_incompatible_crs_rules(CRS_INSTALL_DIR)
msg = f"Installed CRS v{PINNED_CRS_VERSION} to {CRS_INSTALL_DIR}"
+ if disabled:
+ msg += f"; disabled {len(disabled)} rule(s) incompatible with ModSec < 2.9.6"
return StepResult(message=msg)
def install_exclusions(
self,
admin_ips: list[str] | None = None,
+ nodeping_ips: list[str] | None = None,
) -> StepResult:
"""Write NetSapiens-specific ModSecurity exclusions."""
if self.dry_run:
@@ -400,7 +463,13 @@ def install_exclusions(
if file_exists(NS_EXCLUSIONS_CONF):
backup_file(NS_EXCLUSIONS_CONF)
- content = render(NS_EXCLUSIONS_TEMPLATE, admin_ips=admin_ips or [])
+ content = render(
+ NS_EXCLUSIONS_TEMPLATE,
+ admin_ips=admin_ips or [],
+ nodeping_ips=nodeping_ips or [],
+ version=NS_EXCLUSIONS_VERSION,
+ template_hash=NS_EXCLUSIONS_HASH,
+ )
if not write_file(NS_EXCLUSIONS_CONF, content):
return StepResult(
success=False,
@@ -500,7 +569,11 @@ def verify(self) -> list[StepResult]:
# Full install orchestration
# ------------------------------------------------------------------
- def run(self, admin_ips: list[str] | None = None) -> InstallResult:
+ def run(
+ self,
+ admin_ips: list[str] | None = None,
+ nodeping_ips: list[str] | None = None,
+ ) -> InstallResult:
"""Run the full installation sequence."""
result = InstallResult(mode=self.mode)
pf = self.preflight()
@@ -519,7 +592,10 @@ def run(self, admin_ips: list[str] | None = None) -> InstallResult:
("Configure mod_evasive", self.setup_evasive_config),
("Enable mod_evasive", lambda: self.set_evasive_state(True)),
("Install OWASP CRS v4", self.install_crs_v4),
- ("Install NS exclusions", lambda: self.install_exclusions(admin_ips)),
+ (
+ "Install NS exclusions",
+ lambda: self.install_exclusions(admin_ips, nodeping_ips),
+ ),
("Update security2.conf", self.write_security2_conf),
("Validate Apache config", self.validate_config),
]
@@ -582,3 +658,21 @@ def set_mode(self, mode: str) -> StepResult:
msg = f"SecRuleEngine set to {mode} and Apache reloaded"
return StepResult(message=msg)
+
+ def _reenable_crs_rules(self, crs_path: str) -> list[str]:
+ """Re-enable CRS rules previously disabled for ModSec < 2.9.6.
+
+ Renames .conf.disabled files back to .conf for rules in
+ CRS_RULES_REQUIRE_296.
+
+ Returns list of re-enabled filenames.
+ """
+ reenabled: list[str] = []
+ rules_dir = Path(f"{crs_path}/rules")
+ for rule_file in CRS_RULES_REQUIRE_296:
+ disabled = rules_dir / (rule_file + ".disabled")
+ target = rules_dir / rule_file
+ if disabled.exists() and not target.exists():
+ disabled.rename(target)
+ reenabled.append(rule_file)
+ return reenabled
diff --git a/src/nssec/modules/waf/config.py b/src/nssec/modules/waf/config.py
index 4db0dbe..38b6073 100644
--- a/src/nssec/modules/waf/config.py
+++ b/src/nssec/modules/waf/config.py
@@ -34,6 +34,16 @@
# CRS version pinning (used when apt ships v3.x)
PINNED_CRS_VERSION = "4.8.0"
+
+# CRS rule files that require ModSecurity >= 2.9.6 (MULTIPART_PART_HEADERS).
+# These are renamed to .conf.disabled on older ModSecurity versions.
+CRS_RULES_REQUIRE_296 = [
+ "REQUEST-922-MULTIPART-ATTACK.conf",
+]
+DIGITALWAVE_REPO_URL = "http://modsecurity.digitalwave.hu/ubuntu/"
+DIGITALWAVE_KEY_URL = "https://modsecurity.digitalwave.hu/archive.key"
+DIGITALWAVE_KEYRING = "/usr/share/keyrings/digitalwave-modsecurity.gpg"
+DIGITALWAVE_LIST = "/etc/apt/sources.list.d/digitalwave-modsecurity.list"
CRS_GITHUB_DOWNLOAD = (
f"https://github.com/coreruleset/coreruleset/archive/refs/tags/v{PINNED_CRS_VERSION}.tar.gz"
)
@@ -50,16 +60,20 @@
MODSEC_TMP_DIR = "/tmp/"
MODSEC_DATA_DIR = "/tmp/"
-# CRS locations to check (apt-installed or manual)
+# CRS locations to check — nssec-managed path first so v4 is preferred
+# over the apt v3 package when both exist.
CRS_SEARCH_PATHS = [
- "/usr/share/modsecurity-crs",
"/etc/modsecurity/crs",
+ "/usr/share/modsecurity-crs",
"/etc/apache2/modsecurity-crs",
]
# Backup suffix for nssec-managed files
BACKUP_SUFFIX = ".bak.nssec"
+# Exclusions template version — human-readable label for the template revision.
+NS_EXCLUSIONS_VERSION = "2"
+
# ---------------------------------------------------------------------------
# Jinja2 Templates
# ---------------------------------------------------------------------------
@@ -188,6 +202,8 @@
# NetSapiens-specific ModSecurity Exclusions
# Managed by nssec
# Generated: {{ timestamp }}
+# nssec-exclusions-version: {{ version }}
+# nssec-exclusions-hash: {{ template_hash }}
#
# These rules prevent false positives on the NetSapiens management UI
# and API endpoints while keeping CRS protection active for everything else.
@@ -276,6 +292,20 @@
ctl:ruleRemoveByTag=OWASP_CRS"
{% endfor %}
{% endif %}
+
+{% if nodeping_ips %}
+# ---- NodePing monitoring probe IPs ----
+# Health check probes should bypass CRS to avoid false positives.
+# Source: https://nodeping.com/content/txt/pinghosts.txt
+{% for ip in nodeping_ips %}
+SecRule REMOTE_ADDR "@ipMatch {{ ip }}" \\
+ "id:{{ 1000200 + loop.index }},\\
+ phase:1,\\
+ pass,\\
+ nolog,\\
+ ctl:ruleRemoveByTag=OWASP_CRS"
+{% endfor %}
+{% endif %}
"""
CRS_SETUP_OVERRIDES_TEMPLATE = """\
@@ -444,3 +474,18 @@
DOSWhitelist 192.168.*.*
"""
+
+
+def _exclusions_template_hash() -> str:
+ """Compute MD5 hash of NS_EXCLUSIONS_TEMPLATE source.
+
+ Any change to the template (new rules, modified rules, structural changes)
+ automatically produces a different hash. Deployed files embed this hash
+ so 'waf status' can detect drift without manual version bumping.
+ """
+ import hashlib
+
+ return hashlib.md5(NS_EXCLUSIONS_TEMPLATE.encode()).hexdigest()[:12]
+
+
+NS_EXCLUSIONS_HASH = _exclusions_template_hash()
diff --git a/src/nssec/modules/waf/status.py b/src/nssec/modules/waf/status.py
index 22d089d..72f02f1 100644
--- a/src/nssec/modules/waf/status.py
+++ b/src/nssec/modules/waf/status.py
@@ -2,11 +2,13 @@
from __future__ import annotations
+import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from nssec.modules.waf.config import (
+ CRS_RULES_REQUIRE_296,
CRS_SEARCH_PATHS,
EVASIVE_LOAD,
EVASIVE_PACKAGE,
@@ -14,6 +16,9 @@
MODSEC_CONF,
MODSEC_PACKAGE,
NS_EXCLUSIONS_CONF,
+ NS_EXCLUSIONS_HASH,
+ NS_EXCLUSIONS_VERSION,
+ SECURITY2_CONF,
SECURITY2_LOAD,
)
@@ -22,15 +27,26 @@
class WafStatus:
"""Current state of ModSecurity / CRS."""
+ apache_version: Optional[str] = None
+ apache_ppa: bool = False
modsec_installed: bool = False
modsec_enabled: bool = False
modsec_mode: Optional[str] = None
crs_installed: bool = False
crs_version: Optional[str] = None
crs_path: Optional[str] = None
+ crs_setup_present: bool = False
evasive_installed: bool = False
evasive_enabled: bool = False
exclusions_present: bool = False
+ exclusions_version: Optional[str] = None
+ exclusions_current: bool = False
+ exclusions_included: bool = False
+ crs_path_valid: bool = False
+ exclusions_admin_ips: int = 0
+ exclusions_nodeping_ips: int = 0
+ modsec_version: Optional[str] = None
+ disabled_crs_rules: int = 0
audit_log_exists: bool = False
recent_log_lines: list[str] = field(default_factory=list)
@@ -50,6 +66,36 @@ def _pkg_installed(package: str) -> bool:
return False
+def _get_pkg_version(package: str) -> Optional[str]:
+ """Get the upstream version of an installed deb package."""
+ import subprocess
+
+ try:
+ result = subprocess.run(
+ ["dpkg-query", "-W", "-f=${Version}", package],
+ capture_output=True,
+ text=True,
+ timeout=10,
+ )
+ if result.returncode != 0 or not result.stdout.strip():
+ return None
+ # Strip Debian suffix (e.g. "2.9.5-3" → "2.9.5")
+ return result.stdout.strip().split("-")[0]
+ except (subprocess.TimeoutExpired, FileNotFoundError):
+ return None
+
+
+def _is_ondrej_apache_ppa() -> bool:
+ """Check whether the ondrej/apache2 PPA is configured."""
+ import glob
+
+ patterns = [
+ "/etc/apt/sources.list.d/ondrej-ubuntu-apache2-*",
+ "/etc/apt/sources.list.d/ondrej-*apache2*",
+ ]
+ return any(glob.glob(p) for p in patterns)
+
+
def _read_file(path: str) -> Optional[str]:
try:
return Path(path).read_text()
@@ -68,11 +114,37 @@ def _tail_file(path: str, lines: int = 10) -> list[str]:
return []
+def _parse_security2_crs_path(content: str) -> Optional[str]:
+ """Extract the CRS path referenced in security2.conf."""
+ match = re.search(r"IncludeOptional\s+(\S+)/crs-setup\.conf", content)
+ if match:
+ return match.group(1)
+ return None
+
+
+def _parse_exclusions_meta(content: str) -> tuple[Optional[str], Optional[str], int, int]:
+ """Parse exclusions file for version, hash, admin IP count, NodePing IP count."""
+ version = None
+ template_hash = None
+ for line in content.splitlines():
+ if line.startswith("# nssec-exclusions-version:"):
+ version = line.split(":", 1)[1].strip()
+ elif line.startswith("# nssec-exclusions-hash:"):
+ template_hash = line.split(":", 1)[1].strip()
+
+ admin_ips = len(re.findall(r'"id:10001\d+', content))
+ nodeping_ips = len(re.findall(r'"id:10002\d+', content))
+ return version, template_hash, admin_ips, nodeping_ips
+
+
def get_waf_status() -> WafStatus:
"""Collect comprehensive WAF status information."""
status = WafStatus()
+ status.apache_version = _get_pkg_version("apache2")
+ status.apache_ppa = _is_ondrej_apache_ppa()
status.modsec_installed = _pkg_installed(MODSEC_PACKAGE)
+ status.modsec_version = _get_pkg_version(MODSEC_PACKAGE)
status.modsec_enabled = Path(SECURITY2_LOAD).exists()
# Detect mode
@@ -93,12 +165,53 @@ def get_waf_status() -> WafStatus:
version_file = Path(search_path) / "VERSION"
if version_file.exists():
status.crs_version = version_file.read_text().strip()
+ # Check crs-setup.conf exists at this path
+ status.crs_setup_present = (Path(search_path) / "crs-setup.conf").exists()
+ # Count disabled CRS rules (files renamed .conf.disabled for ModSec compat)
+ rules_dir = Path(search_path) / "rules"
+ if rules_dir.is_dir():
+ status.disabled_crs_rules = sum(
+ 1 for f in CRS_RULES_REQUIRE_296
+ if (rules_dir / (f + ".disabled")).exists()
+ )
break
+ # Check security2.conf references the correct CRS path and includes exclusions
+ sec2_content = _read_file(SECURITY2_CONF)
+ if sec2_content:
+ sec2_crs_path = _parse_security2_crs_path(sec2_content)
+
+ # Exclusions are included if security2.conf either:
+ # 1. Explicitly includes the exclusions file path, OR
+ # 2. Uses a wildcard IncludeOptional /etc/modsecurity/*.conf
+ # (the default Debian config) which picks up all .conf in that dir
+ has_explicit = NS_EXCLUSIONS_CONF in sec2_content
+ has_wildcard = "/etc/modsecurity/*.conf" in sec2_content
+ status.exclusions_included = has_explicit or has_wildcard
+
+ status.crs_path_valid = (
+ sec2_crs_path is not None
+ and Path(sec2_crs_path).is_dir()
+ and (Path(sec2_crs_path) / "crs-setup.conf").exists()
+ )
+
status.evasive_installed = _pkg_installed(EVASIVE_PACKAGE)
status.evasive_enabled = Path(EVASIVE_LOAD).exists()
+ # Parse exclusions file
status.exclusions_present = Path(NS_EXCLUSIONS_CONF).exists()
+ if status.exclusions_present:
+ excl_content = _read_file(NS_EXCLUSIONS_CONF)
+ if excl_content:
+ version, deployed_hash, admin_count, np_count = _parse_exclusions_meta(
+ excl_content
+ )
+ status.exclusions_version = version
+ # Use hash for drift detection — automatically catches any template change
+ status.exclusions_current = deployed_hash == NS_EXCLUSIONS_HASH
+ status.exclusions_admin_ips = admin_count
+ status.exclusions_nodeping_ips = np_count
+
status.audit_log_exists = Path(MODSEC_AUDIT_LOG).exists()
if status.audit_log_exists:
status.recent_log_lines = _tail_file(MODSEC_AUDIT_LOG, 10)
diff --git a/src/nssec/modules/waf/utils.py b/src/nssec/modules/waf/utils.py
index 7fdf72a..a2c8033 100644
--- a/src/nssec/modules/waf/utils.py
+++ b/src/nssec/modules/waf/utils.py
@@ -68,6 +68,51 @@ def render(template_str: str, **kwargs: object) -> str:
)
+def _parse_version(s: str) -> tuple[int, ...]:
+ """Parse a version string into an int tuple, stripping pre-release suffixes.
+
+ Handles versions like "2.9.13~pre", "2.9.5-1", "2.9.6".
+ """
+ import re
+
+ parts = []
+ for component in s.split("."):
+ # Extract leading digits, ignore suffixes like ~pre, -rc1, etc.
+ m = re.match(r"(\d+)", component)
+ if m:
+ parts.append(int(m.group(1)))
+ else:
+ raise ValueError(f"Cannot parse version component: {component}")
+ return tuple(parts)
+
+
+def version_gte(version_str: str | None, target: str) -> bool:
+ """Compare version strings using tuple comparison.
+
+ Returns True if version_str >= target. Returns False on parse errors.
+ Handles pre-release suffixes like ~pre, -rc1, etc.
+ """
+ try:
+ v = _parse_version(version_str)
+ t = _parse_version(target)
+ return v >= t
+ except (ValueError, AttributeError, TypeError):
+ return False
+
+
+def detect_modsec_version() -> str | None:
+ """Detect installed ModSecurity version from dpkg metadata.
+
+ Returns version string (e.g. "2.9.5") or None if not installed.
+ """
+ stdout, _, rc = run_cmd(["dpkg-query", "-W", "-f=${Version}", "libapache2-mod-security2"])
+ if rc != 0 or not stdout.strip():
+ return None
+ # dpkg version may have suffixes like "2.9.5-3" — keep only the upstream part
+ version = stdout.strip().split("-")[0]
+ return version
+
+
def detect_modsec_mode(config_paths: list[str]) -> str | None:
"""Read SecRuleEngine mode from the first config that has it."""
for config_path in config_paths:
@@ -105,10 +150,30 @@ def parse_security2_conf(path: str) -> tuple[bool, bool]:
def append_crs_to_security2(crs_path: str) -> bool:
"""Append CRS IncludeOptional directives to an existing security2.conf.
+ Comments out any existing CRS load lines (e.g., apt v3
+ ``IncludeOptional /usr/share/modsecurity-crs/*.load``) to prevent
+ dual-loading when nssec manages CRS v4 separately.
+
Returns True on success, False on write failure.
"""
sec2_content = read_file(SECURITY2_CONF) or ""
backup_file(SECURITY2_CONF)
+
+ # Comment out old CRS include lines to prevent dual-loading.
+ new_lines = []
+ for line in sec2_content.splitlines():
+ stripped = line.strip()
+ if (
+ not stripped.startswith("#")
+ and "IncludeOptional" in stripped
+ and "modsecurity-crs" in stripped
+ ):
+ new_lines.append(" # Disabled by nssec (CRS v4 managed below)")
+ new_lines.append(f" # {stripped}")
+ else:
+ new_lines.append(line)
+ sec2_content = "\n".join(new_lines)
+
crs_block = (
f"\n # OWASP CRS (added by nssec)\n"
f" IncludeOptional {crs_path}/crs-setup.conf\n"
diff --git a/tests/unit/test_waf_commands.py b/tests/unit/test_waf_commands.py
index 2041709..28bb500 100644
--- a/tests/unit/test_waf_commands.py
+++ b/tests/unit/test_waf_commands.py
@@ -324,3 +324,165 @@ def test_default_subcommand_shows_status(self, runner):
assert result.exit_code == 0
assert "mod_evasive Status" in result.output
+
+
+class TestWafInitNodeping:
+ """Tests for waf init command with NodePing IP fetching."""
+
+ def test_fetches_nodeping_ips_during_init(self, runner, mock_installer):
+ """Should fetch NodePing IPs and pass to run()."""
+ pf = mock_installer.preflight.return_value
+ pf.can_proceed = True
+ pf.crs_installed = True
+ pf.crs_version = "4.8.0"
+ pf.warnings = []
+
+ run_result = MagicMock()
+ run_result.success = True
+ run_result.steps_completed = ["Done"]
+ run_result.steps_skipped = []
+ run_result.warnings = []
+ run_result.errors = []
+ mock_installer.run.return_value = run_result
+ mock_installer.reload_apache.return_value = MagicMock(success=True, message="Reloaded")
+
+ with patch(
+ "nssec.modules.waf.fetch_nodeping_probe_ips",
+ return_value=(["52.71.195.82"], ""),
+ ):
+ result = runner.invoke(waf, ["init", "-y"])
+
+ assert result.exit_code == 0
+ # Verify NodePing IPs were passed to run()
+ mock_installer.run.assert_called_once()
+ call_kwargs = mock_installer.run.call_args
+ assert call_kwargs.kwargs.get("nodeping_ips") == ["52.71.195.82"]
+
+ def test_warns_on_nodeping_fetch_failure(self, runner, mock_installer):
+ """Should warn but continue when NodePing fetch fails."""
+ pf = mock_installer.preflight.return_value
+ pf.can_proceed = True
+ pf.crs_installed = True
+ pf.crs_version = "4.8.0"
+ pf.warnings = []
+
+ run_result = MagicMock()
+ run_result.success = True
+ run_result.steps_completed = ["Done"]
+ run_result.steps_skipped = []
+ run_result.warnings = []
+ run_result.errors = []
+ mock_installer.run.return_value = run_result
+ mock_installer.reload_apache.return_value = MagicMock(success=True, message="Reloaded")
+
+ with patch(
+ "nssec.modules.waf.fetch_nodeping_probe_ips",
+ return_value=([], "Connection error"),
+ ):
+ result = runner.invoke(waf, ["init", "-y"])
+
+ assert result.exit_code == 0
+ assert "Warning" in result.output
+
+
+class TestWafUpdateExclusionsNodeping:
+ """Tests for waf update-exclusions command with NodePing IPs."""
+
+ def test_fetches_nodeping_ips_during_update(self, runner, mock_installer):
+ """Should fetch NodePing IPs and pass to install_exclusions()."""
+ step = MagicMock()
+ step.success = True
+ step.message = "Updated"
+ mock_installer.install_exclusions.return_value = step
+ mock_installer.validate_config.return_value = step
+ mock_installer.reload_apache.return_value = step
+
+ with patch(
+ "nssec.modules.waf.fetch_nodeping_probe_ips",
+ return_value=(["52.71.195.82", "3.21.118.250"], ""),
+ ):
+ result = runner.invoke(waf, ["update-exclusions", "-y"])
+
+ assert result.exit_code == 0
+ mock_installer.install_exclusions.assert_called_once()
+ call_kwargs = mock_installer.install_exclusions.call_args
+ assert call_kwargs.kwargs.get("nodeping_ips") == ["52.71.195.82", "3.21.118.250"]
+
+ def test_warns_on_nodeping_fetch_failure_update(self, runner, mock_installer):
+ """Should warn but continue when NodePing fetch fails during update."""
+ step = MagicMock()
+ step.success = True
+ step.message = "Updated"
+ mock_installer.install_exclusions.return_value = step
+ mock_installer.validate_config.return_value = step
+ mock_installer.reload_apache.return_value = step
+
+ with patch(
+ "nssec.modules.waf.fetch_nodeping_probe_ips",
+ return_value=([], "Timeout"),
+ ):
+ result = runner.invoke(waf, ["update-exclusions", "-y"])
+
+ assert result.exit_code == 0
+ assert "Warning" in result.output
+
+
+class TestWafUpdate:
+ """Tests for waf update command."""
+
+ def test_requires_root(self, runner, mock_installer):
+ """Should fail if not root."""
+ mock_installer.preflight.return_value.is_root = False
+
+ with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.5"), \
+ patch("nssec.modules.waf.utils.version_gte", return_value=False):
+ result = runner.invoke(waf, ["update", "-y"])
+
+ assert result.exit_code == 1
+ assert "root" in result.output.lower()
+
+ def test_shows_instructions_when_old(self, runner, mock_installer):
+ """Should show Digitalwave repo instructions when ModSec < 2.9.6."""
+ with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.5"), \
+ patch("nssec.modules.waf.utils.version_gte", return_value=False):
+ result = runner.invoke(waf, ["update", "-y"])
+
+ assert result.exit_code == 0
+ assert "Digitalwave" in result.output
+ assert "signed-by=" in result.output
+ assert "apt-get update" in result.output
+
+ def test_nothing_to_do_when_current_no_disabled(self, runner, mock_installer):
+ """Should report nothing to do when >= 2.9.6 and no disabled rules."""
+ mock_installer._reenable_crs_rules.return_value = []
+ mock_installer.preflight.return_value.crs_path = "/etc/modsecurity/crs"
+
+ with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.7"), \
+ patch("nssec.modules.waf.utils.version_gte", return_value=True):
+ result = runner.invoke(waf, ["update", "-y"])
+
+ assert result.exit_code == 0
+ assert "Nothing to do" in result.output
+
+ def test_reenables_rules_after_upgrade(self, runner, mock_installer):
+ """Should re-enable disabled rules when ModSec >= 2.9.6."""
+ mock_installer._reenable_crs_rules.return_value = ["REQUEST-922-MULTIPART-ATTACK.conf"]
+ mock_installer.preflight.return_value.crs_path = "/etc/modsecurity/crs"
+
+ validate_result = MagicMock()
+ validate_result.success = True
+ validate_result.message = "Apache config test passed"
+ mock_installer.validate_config.return_value = validate_result
+
+ reload_result = MagicMock()
+ reload_result.success = True
+ reload_result.message = "Apache reloaded"
+ mock_installer.reload_apache.return_value = reload_result
+
+ with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.7"), \
+ patch("nssec.modules.waf.utils.version_gte", return_value=True):
+ result = runner.invoke(waf, ["update", "-y"])
+
+ assert result.exit_code == 0
+ mock_installer._reenable_crs_rules.assert_called_once()
+ assert "Re-enabled" in result.output
diff --git a/tests/unit/test_waf_module.py b/tests/unit/test_waf_module.py
index 8fd8095..703b88c 100644
--- a/tests/unit/test_waf_module.py
+++ b/tests/unit/test_waf_module.py
@@ -1,7 +1,8 @@
"""Tests for WAF module functions."""
import pytest
-from unittest.mock import patch, MagicMock, call
+from pathlib import Path
+from unittest.mock import patch, MagicMock, call, ANY
from jinja2 import Template
@@ -425,3 +426,337 @@ def test_detect_mode_does_not_toggle_evasive(self, mock_file_ops):
assert not any("evasive" in c for c in run_calls)
# Message should NOT mention mod_evasive
assert "mod_evasive" not in result.message
+
+
+class TestFetchNodepingProbeIps:
+ """Tests for fetch_nodeping_probe_ips function."""
+
+ def test_returns_ips_from_mtls_util(self):
+ """Should delegate to mTLS fetch_nodeping_ips utility."""
+ from nssec.modules.waf import fetch_nodeping_probe_ips
+
+ with patch(
+ "nssec.modules.mtls.utils.fetch_nodeping_ips",
+ return_value=(["52.71.195.82", "3.21.118.250"], ""),
+ ):
+ ips, err = fetch_nodeping_probe_ips()
+
+ assert ips == ["52.71.195.82", "3.21.118.250"]
+ assert err == ""
+
+ def test_returns_error_on_failure(self):
+ """Should propagate error from mTLS utility."""
+ from nssec.modules.waf import fetch_nodeping_probe_ips
+
+ with patch(
+ "nssec.modules.mtls.utils.fetch_nodeping_ips",
+ return_value=([], "Failed to fetch NodePing IPs: connection error"),
+ ):
+ ips, err = fetch_nodeping_probe_ips()
+
+ assert ips == []
+ assert "Failed to fetch" in err
+
+
+class TestInstallExclusionsWithNodeping:
+ """Tests for install_exclusions with nodeping_ips parameter."""
+
+ def test_passes_nodeping_ips_to_template(self, mock_file_ops):
+ """Should pass nodeping_ips to the exclusions template."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ installer = ModSecurityInstaller()
+ nodeping = ["52.71.195.82", "3.21.118.250"]
+ result = installer.install_exclusions(nodeping_ips=nodeping)
+
+ assert result.success
+ render_call = mock_file_ops["render"].call_args
+ assert render_call[1].get("nodeping_ips") == nodeping or \
+ nodeping in render_call[0] if render_call[0] else False
+
+ def test_defaults_to_empty_nodeping_list(self, mock_file_ops):
+ """Should default to empty list when no nodeping_ips provided."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ installer = ModSecurityInstaller()
+ result = installer.install_exclusions()
+
+ assert result.success
+ render_call = mock_file_ops["render"].call_args
+ # nodeping_ips should be an empty list
+ assert render_call[1].get("nodeping_ips") == [] or \
+ render_call.kwargs.get("nodeping_ips") == []
+
+ def test_passes_both_admin_and_nodeping_ips(self, mock_file_ops):
+ """Should pass both admin_ips and nodeping_ips to template."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ installer = ModSecurityInstaller()
+ admin = ["192.168.1.100"]
+ nodeping = ["52.71.195.82"]
+ result = installer.install_exclusions(admin_ips=admin, nodeping_ips=nodeping)
+
+ assert result.success
+ render_call = mock_file_ops["render"].call_args
+ assert render_call.kwargs.get("admin_ips") == admin
+ assert render_call.kwargs.get("nodeping_ips") == nodeping
+
+
+class TestNodepingExclusionsTemplate:
+ """Tests for NS_EXCLUSIONS_TEMPLATE with NodePing IPs."""
+
+ def test_renders_nodeping_section(self):
+ """Should render NodePing IP rules in the exclusions template."""
+ from nssec.modules.waf.config import NS_EXCLUSIONS_TEMPLATE
+
+ rendered = Template(NS_EXCLUSIONS_TEMPLATE).render(
+ timestamp="test",
+ admin_ips=[],
+ nodeping_ips=["52.71.195.82", "3.21.118.250"],
+ )
+ assert "NodePing monitoring probe IPs" in rendered
+ assert "52.71.195.82" in rendered
+ assert "3.21.118.250" in rendered
+ assert "id:1000201" in rendered
+ assert "id:1000202" in rendered
+
+ def test_omits_nodeping_section_when_empty(self):
+ """Should not render NodePing section when list is empty."""
+ from nssec.modules.waf.config import NS_EXCLUSIONS_TEMPLATE
+
+ rendered = Template(NS_EXCLUSIONS_TEMPLATE).render(
+ timestamp="test",
+ admin_ips=[],
+ nodeping_ips=[],
+ )
+ assert "NodePing" not in rendered
+
+ def test_nodeping_rule_ids_separate_from_admin(self):
+ """NodePing rules should use 1000200+ range, admin uses 1000100+."""
+ from nssec.modules.waf.config import NS_EXCLUSIONS_TEMPLATE
+
+ rendered = Template(NS_EXCLUSIONS_TEMPLATE).render(
+ timestamp="test",
+ admin_ips=["192.168.1.100"],
+ nodeping_ips=["52.71.195.82"],
+ )
+ assert "id:1000101" in rendered # admin IP
+ assert "id:1000201" in rendered # NodePing IP
+
+ def test_nodeping_rules_bypass_crs(self):
+ """NodePing rules should bypass all CRS rules."""
+ from nssec.modules.waf.config import NS_EXCLUSIONS_TEMPLATE
+
+ rendered = Template(NS_EXCLUSIONS_TEMPLATE).render(
+ timestamp="test",
+ admin_ips=[],
+ nodeping_ips=["52.71.195.82"],
+ )
+ # Find the NodePing rule section
+ lines = rendered.split("\n")
+ nodeping_rule_lines = [l for l in lines if "1000201" in l]
+ assert len(nodeping_rule_lines) > 0
+ assert "ruleRemoveByTag=OWASP_CRS" in rendered
+
+
+class TestInstallCrsV4UpdatesSetup:
+ """Tests for install_crs_v4 updating crs-setup.conf when v4 already present."""
+
+ def test_updates_crs_setup_when_v4_present(self, mock_file_ops):
+ """Should update crs-setup.conf even when CRS v4 is already installed."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ installer = ModSecurityInstaller()
+ pf = MagicMock()
+ pf.crs_installed = True
+ pf.crs_version = "4.8.0"
+ pf.crs_path = "/etc/modsecurity/crs"
+ pf.can_proceed = True
+ installer._preflight = pf
+
+ with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.7"), \
+ patch("nssec.modules.waf.version_gte", return_value=True):
+ result = installer.install_crs_v4()
+
+ assert result.skipped
+ assert "crs-setup.conf updated" in result.message
+ # Should have called write_file for crs-setup.conf
+ mock_file_ops["write"].assert_called_once()
+ write_args = mock_file_ops["write"].call_args
+ assert "crs-setup.conf" in write_args[0][0]
+
+ def test_skips_setup_update_renders_template(self, mock_file_ops):
+ """Should render the CRS_SETUP_OVERRIDES_TEMPLATE when updating."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ installer = ModSecurityInstaller()
+ pf = MagicMock()
+ pf.crs_installed = True
+ pf.crs_version = "4.8.0"
+ pf.crs_path = "/etc/modsecurity/crs"
+ pf.can_proceed = True
+ installer._preflight = pf
+
+ with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.7"), \
+ patch("nssec.modules.waf.version_gte", return_value=True):
+ installer.install_crs_v4()
+
+ # render should have been called for crs-setup.conf
+ mock_file_ops["render"].assert_called_once()
+
+
+class TestPreflightCacheRefresh:
+ """Tests for preflight cache being cleared after CRS download."""
+
+ def test_clears_preflight_after_download(self, mock_file_ops):
+ """Should clear _preflight cache after successful CRS download."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ with patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)), \
+ patch("nssec.modules.waf.Path"):
+ installer = ModSecurityInstaller()
+ pf = MagicMock()
+ pf.crs_installed = False
+ pf.crs_version = None
+ pf.crs_path = None
+ pf.can_proceed = True
+ installer._preflight = pf
+
+ result = installer._download_crs_from_github()
+
+ assert result.success
+ assert installer._preflight is None
+
+ def test_does_not_clear_preflight_on_download_failure(self, mock_file_ops):
+ """Should not clear preflight cache if download fails."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ with patch("nssec.modules.waf.run_cmd", return_value=("", "error", 1)):
+ installer = ModSecurityInstaller()
+ pf = MagicMock()
+ installer._preflight = pf
+
+ result = installer._download_crs_from_github()
+
+ assert not result.success
+ assert installer._preflight is pf # unchanged
+
+
+class TestDisableIncompatibleCrsRules:
+ """Tests for ModSecurityInstaller._disable_incompatible_crs_rules."""
+
+ def test_disables_rules_on_old_modsec(self, tmp_path, mock_file_ops):
+ """Should rename .conf to .conf.disabled when ModSec < 2.9.6."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ rule_file = rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf"
+ rule_file.write_text("# rule content")
+
+ with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), \
+ patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: False):
+ installer = ModSecurityInstaller()
+ disabled = installer._disable_incompatible_crs_rules(str(tmp_path))
+
+ assert disabled == ["REQUEST-922-MULTIPART-ATTACK.conf"]
+ assert (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf.disabled").exists()
+ assert not rule_file.exists()
+
+ def test_skips_on_new_modsec(self, tmp_path, mock_file_ops):
+ """Should not disable rules when ModSec >= 2.9.6."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ rule_file = rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf"
+ rule_file.write_text("# rule content")
+
+ with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.6"), \
+ patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: True):
+ installer = ModSecurityInstaller()
+ disabled = installer._disable_incompatible_crs_rules(str(tmp_path))
+
+ assert disabled == []
+ assert rule_file.exists()
+
+ def test_skips_already_disabled(self, tmp_path, mock_file_ops):
+ """Should not rename if .conf.disabled already exists."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ # Both files exist — should not touch them
+ (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf").write_text("# rule")
+ (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf.disabled").write_text("# disabled")
+
+ with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), \
+ patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: False):
+ installer = ModSecurityInstaller()
+ disabled = installer._disable_incompatible_crs_rules(str(tmp_path))
+
+ assert disabled == []
+
+ def test_handles_missing_rule_file(self, tmp_path, mock_file_ops):
+ """Should handle case where rule file doesn't exist."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ # No rule files created
+
+ with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), \
+ patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: False):
+ installer = ModSecurityInstaller()
+ disabled = installer._disable_incompatible_crs_rules(str(tmp_path))
+
+ assert disabled == []
+
+
+class TestReenableCrsRules:
+ """Tests for ModSecurityInstaller._reenable_crs_rules."""
+
+ def test_reenables_disabled_rules(self, tmp_path, mock_file_ops):
+ """Should rename .conf.disabled back to .conf."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ disabled_file = rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf.disabled"
+ disabled_file.write_text("# rule content")
+
+ installer = ModSecurityInstaller()
+ reenabled = installer._reenable_crs_rules(str(tmp_path))
+
+ assert reenabled == ["REQUEST-922-MULTIPART-ATTACK.conf"]
+ assert (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf").exists()
+ assert not disabled_file.exists()
+
+ def test_skips_when_target_exists(self, tmp_path, mock_file_ops):
+ """Should not rename if .conf already exists (avoid overwrite)."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf").write_text("# active")
+ (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf.disabled").write_text("# old")
+
+ installer = ModSecurityInstaller()
+ reenabled = installer._reenable_crs_rules(str(tmp_path))
+
+ assert reenabled == []
+
+ def test_returns_empty_when_nothing_disabled(self, tmp_path, mock_file_ops):
+ """Should return empty list when no disabled files found."""
+ from nssec.modules.waf import ModSecurityInstaller
+
+ rules_dir = tmp_path / "rules"
+ rules_dir.mkdir()
+ (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf").write_text("# active")
+
+ installer = ModSecurityInstaller()
+ reenabled = installer._reenable_crs_rules(str(tmp_path))
+
+ assert reenabled == []
+
+
diff --git a/tests/unit/test_waf_status.py b/tests/unit/test_waf_status.py
new file mode 100644
index 0000000..e04bb15
--- /dev/null
+++ b/tests/unit/test_waf_status.py
@@ -0,0 +1,133 @@
+"""Tests for WAF status reporting."""
+
+import pytest
+from unittest.mock import patch, MagicMock
+from jinja2 import Template
+
+
+class TestParseExclusionsMeta:
+ """Tests for _parse_exclusions_meta."""
+
+ def test_parses_version_and_hash(self):
+ from nssec.modules.waf.status import _parse_exclusions_meta
+
+ content = """\
+# nssec-exclusions-version: 2
+# nssec-exclusions-hash: abc123def456
+SecRule REMOTE_ADDR "@ipMatch 192.168.1.1" "id:1000101,phase:1"
+SecRule REMOTE_ADDR "@ipMatch 52.71.195.82" "id:1000201,phase:1"
+"""
+ version, template_hash, admin, nodeping = _parse_exclusions_meta(content)
+ assert version == "2"
+ assert template_hash == "abc123def456"
+ assert admin == 1
+ assert nodeping == 1
+
+ def test_counts_multiple_ips(self):
+ from nssec.modules.waf.status import _parse_exclusions_meta
+
+ content = """\
+# nssec-exclusions-version: 2
+# nssec-exclusions-hash: abc123
+"id:1000101,phase:1"
+"id:1000102,phase:1"
+"id:1000201,phase:1"
+"id:1000202,phase:1"
+"id:1000203,phase:1"
+"""
+ version, _, admin, nodeping = _parse_exclusions_meta(content)
+ assert admin == 2
+ assert nodeping == 3
+
+ def test_handles_missing_version_and_hash(self):
+ from nssec.modules.waf.status import _parse_exclusions_meta
+
+ content = "# Old exclusions file without version\nSecRule something"
+ version, template_hash, admin, nodeping = _parse_exclusions_meta(content)
+ assert version is None
+ assert template_hash is None
+ assert admin == 0
+ assert nodeping == 0
+
+
+class TestParseSecurity2CrsPath:
+ """Tests for _parse_security2_crs_path."""
+
+ def test_extracts_crs_path(self):
+ from nssec.modules.waf.status import _parse_security2_crs_path
+
+ content = """\
+
+ IncludeOptional /etc/modsecurity/modsecurity.conf
+ IncludeOptional /etc/modsecurity/crs/crs-setup.conf
+ IncludeOptional /etc/modsecurity/crs/rules/*.conf
+
+"""
+ assert _parse_security2_crs_path(content) == "/etc/modsecurity/crs"
+
+ def test_extracts_apt_crs_path(self):
+ from nssec.modules.waf.status import _parse_security2_crs_path
+
+ content = "IncludeOptional /usr/share/modsecurity-crs/crs-setup.conf"
+ assert _parse_security2_crs_path(content) == "/usr/share/modsecurity-crs"
+
+ def test_returns_none_when_no_crs(self):
+ from nssec.modules.waf.status import _parse_security2_crs_path
+
+ content = "IncludeOptional /etc/modsecurity/modsecurity.conf"
+ assert _parse_security2_crs_path(content) is None
+
+
+class TestExclusionsHashDrift:
+ """Tests for template hash drift detection."""
+
+ def test_matching_hash_means_current(self):
+ from nssec.modules.waf.config import (
+ NS_EXCLUSIONS_HASH,
+ NS_EXCLUSIONS_TEMPLATE,
+ NS_EXCLUSIONS_VERSION,
+ )
+ from nssec.modules.waf.status import _parse_exclusions_meta
+
+ # Render the template with the current hash
+ rendered = Template(NS_EXCLUSIONS_TEMPLATE).render(
+ timestamp="test",
+ admin_ips=[],
+ nodeping_ips=[],
+ version=NS_EXCLUSIONS_VERSION,
+ template_hash=NS_EXCLUSIONS_HASH,
+ )
+ _, deployed_hash, _, _ = _parse_exclusions_meta(rendered)
+ assert deployed_hash == NS_EXCLUSIONS_HASH
+
+ def test_old_hash_means_outdated(self):
+ from nssec.modules.waf.config import NS_EXCLUSIONS_HASH
+ from nssec.modules.waf.status import _parse_exclusions_meta
+
+ content = "# nssec-exclusions-hash: stale_old_hash\n"
+ _, deployed_hash, _, _ = _parse_exclusions_meta(content)
+ assert deployed_hash != NS_EXCLUSIONS_HASH
+
+ def test_missing_hash_means_outdated(self):
+ from nssec.modules.waf.config import NS_EXCLUSIONS_HASH
+ from nssec.modules.waf.status import _parse_exclusions_meta
+
+ content = "# Old file without hash\n"
+ _, deployed_hash, _, _ = _parse_exclusions_meta(content)
+ assert deployed_hash is None
+ assert deployed_hash != NS_EXCLUSIONS_HASH
+
+
+class TestExclusionsTemplateHash:
+ """Tests for the template hash computation."""
+
+ def test_hash_is_deterministic(self):
+ from nssec.modules.waf.config import _exclusions_template_hash
+
+ assert _exclusions_template_hash() == _exclusions_template_hash()
+
+ def test_hash_is_12_chars(self):
+ from nssec.modules.waf.config import NS_EXCLUSIONS_HASH
+
+ assert len(NS_EXCLUSIONS_HASH) == 12
+ assert all(c in "0123456789abcdef" for c in NS_EXCLUSIONS_HASH)
diff --git a/tests/unit/test_waf_utils.py b/tests/unit/test_waf_utils.py
new file mode 100644
index 0000000..2f80b19
--- /dev/null
+++ b/tests/unit/test_waf_utils.py
@@ -0,0 +1,255 @@
+"""Tests for WAF utility functions."""
+
+from unittest.mock import patch
+
+import pytest
+
+
+class TestVersionGte:
+ """Tests for version_gte helper."""
+
+ def test_equal_versions(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("2.9.6", "2.9.6") is True
+
+ def test_greater_patch(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("2.9.7", "2.9.6") is True
+
+ def test_greater_minor(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("2.10.0", "2.9.6") is True
+
+ def test_greater_major(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("3.0.0", "2.9.6") is True
+
+ def test_less_than(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("2.9.5", "2.9.6") is False
+
+ def test_none_returns_false(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte(None, "2.9.6") is False
+
+ def test_empty_string_returns_false(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("", "2.9.6") is False
+
+ def test_invalid_version_returns_false(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("not.a.version", "2.9.6") is False
+
+ def test_two_component_versions(self):
+ from nssec.modules.waf.utils import version_gte
+
+ assert version_gte("2.10", "2.9") is True
+ assert version_gte("2.8", "2.9") is False
+
+
+class TestDetectModsecVersion:
+ """Tests for detect_modsec_version."""
+
+ def test_returns_version_string(self):
+ from nssec.modules.waf.utils import detect_modsec_version
+
+ with patch("nssec.modules.waf.utils.run_cmd", return_value=("2.9.5-3", "", 0)):
+ ver = detect_modsec_version()
+ assert ver == "2.9.5"
+
+ def test_returns_none_when_not_installed(self):
+ from nssec.modules.waf.utils import detect_modsec_version
+
+ with patch("nssec.modules.waf.utils.run_cmd", return_value=("", "not installed", 1)):
+ ver = detect_modsec_version()
+ assert ver is None
+
+ def test_strips_debian_suffix(self):
+ from nssec.modules.waf.utils import detect_modsec_version
+
+ with patch("nssec.modules.waf.utils.run_cmd", return_value=("2.9.7-1ubuntu2", "", 0)):
+ ver = detect_modsec_version()
+ assert ver == "2.9.7"
+
+ def test_returns_none_on_empty_stdout(self):
+ from nssec.modules.waf.utils import detect_modsec_version
+
+ with patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)):
+ ver = detect_modsec_version()
+ assert ver is None
+
+
+DEBIAN_DEFAULT_SEC2 = """\
+
+ SecDataDir /var/cache/modsecurity
+ IncludeOptional /etc/modsecurity/*.conf
+ IncludeOptional /usr/share/modsecurity-crs/*.load
+
+"""
+
+NSSEC_MANAGED_SEC2 = """\
+
+ IncludeOptional /etc/modsecurity/modsecurity.conf
+ IncludeOptional /etc/modsecurity/crs/crs-setup.conf
+ IncludeOptional /etc/modsecurity/crs/rules/*.conf
+
+"""
+
+
+class TestParseSecurity2Conf:
+ """Tests for parse_security2_conf utility."""
+
+ def test_detects_wildcard_include(self):
+ from nssec.modules.waf.utils import parse_security2_conf
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2):
+ has_wildcard, has_crs_load = parse_security2_conf("/fake")
+ assert has_wildcard is True
+ assert has_crs_load is True
+
+ def test_detects_no_wildcard(self):
+ from nssec.modules.waf.utils import parse_security2_conf
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=NSSEC_MANAGED_SEC2):
+ has_wildcard, has_crs_load = parse_security2_conf("/fake")
+ assert has_wildcard is False
+ assert has_crs_load is False
+
+ def test_ignores_commented_lines(self):
+ from nssec.modules.waf.utils import parse_security2_conf
+
+ content = """\
+
+ # IncludeOptional /etc/modsecurity/*.conf
+ # IncludeOptional /usr/share/modsecurity-crs/*.load
+
+"""
+ with patch("nssec.modules.waf.utils.read_file", return_value=content):
+ has_wildcard, has_crs_load = parse_security2_conf("/fake")
+ assert has_wildcard is False
+ assert has_crs_load is False
+
+ def test_returns_false_when_file_missing(self):
+ from nssec.modules.waf.utils import parse_security2_conf
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=None):
+ has_wildcard, has_crs_load = parse_security2_conf("/fake")
+ assert has_wildcard is False
+ assert has_crs_load is False
+
+
+class TestAppendCrsToSecurity2:
+ """Tests for append_crs_to_security2 utility."""
+
+ def test_appends_crs_includes(self):
+ from nssec.modules.waf.utils import append_crs_to_security2
+
+ written = {}
+
+ def capture_write(path, content):
+ written["content"] = content
+ return True
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \
+ patch("nssec.modules.waf.utils.backup_file"), \
+ patch("nssec.modules.waf.utils.write_file", side_effect=capture_write):
+ result = append_crs_to_security2("/etc/modsecurity/crs")
+
+ assert result is True
+ content = written["content"]
+ assert "IncludeOptional /etc/modsecurity/crs/crs-setup.conf" in content
+ assert "IncludeOptional /etc/modsecurity/crs/rules/*.conf" in content
+ assert "added by nssec" in content
+
+ def test_comments_out_old_v3_load_line(self):
+ """Should comment out the apt v3 *.load line to prevent dual-loading."""
+ from nssec.modules.waf.utils import append_crs_to_security2
+
+ written = {}
+
+ def capture_write(path, content):
+ written["content"] = content
+ return True
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \
+ patch("nssec.modules.waf.utils.backup_file"), \
+ patch("nssec.modules.waf.utils.write_file", side_effect=capture_write):
+ append_crs_to_security2("/etc/modsecurity/crs")
+
+ content = written["content"]
+ # The old v3 line should be commented out
+ assert "# IncludeOptional /usr/share/modsecurity-crs/*.load" in content
+ assert "Disabled by nssec" in content
+ # It should NOT appear as an active (uncommented) directive
+ for line in content.splitlines():
+ stripped = line.strip()
+ if stripped.startswith("#"):
+ continue
+ assert "modsecurity-crs/*.load" not in stripped
+
+ def test_preserves_wildcard_modsecurity_conf(self):
+ """Should NOT touch the /etc/modsecurity/*.conf wildcard."""
+ from nssec.modules.waf.utils import append_crs_to_security2
+
+ written = {}
+
+ def capture_write(path, content):
+ written["content"] = content
+ return True
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \
+ patch("nssec.modules.waf.utils.backup_file"), \
+ patch("nssec.modules.waf.utils.write_file", side_effect=capture_write):
+ append_crs_to_security2("/etc/modsecurity/crs")
+
+ content = written["content"]
+ # The modsecurity/*.conf line should remain active (not commented)
+ active_lines = [
+ l.strip() for l in content.splitlines()
+ if not l.strip().startswith("#") and l.strip()
+ ]
+ assert any("/etc/modsecurity/*.conf" in l for l in active_lines)
+
+ def test_does_not_comment_out_already_commented_lines(self):
+ """Should not double-comment already commented CRS lines."""
+ from nssec.modules.waf.utils import append_crs_to_security2
+
+ content_with_comment = """\
+
+ SecDataDir /var/cache/modsecurity
+ IncludeOptional /etc/modsecurity/*.conf
+ # IncludeOptional /usr/share/modsecurity-crs/*.load
+
+"""
+ written = {}
+
+ def capture_write(path, content):
+ written["content"] = content
+ return True
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=content_with_comment), \
+ patch("nssec.modules.waf.utils.backup_file"), \
+ patch("nssec.modules.waf.utils.write_file", side_effect=capture_write):
+ append_crs_to_security2("/etc/modsecurity/crs")
+
+ content = written["content"]
+ # Should not have "Disabled by nssec" since the line was already commented
+ assert "Disabled by nssec" not in content
+
+ def test_returns_false_on_write_failure(self):
+ from nssec.modules.waf.utils import append_crs_to_security2
+
+ with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \
+ patch("nssec.modules.waf.utils.backup_file"), \
+ patch("nssec.modules.waf.utils.write_file", return_value=False):
+ result = append_crs_to_security2("/etc/modsecurity/crs")
+
+ assert result is False