diff --git a/README.md b/README.md index c3bfe33..ad4af5f 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ pip install -e . |----|-------------------|--------| | Ubuntu 24.04 LTS | v44.x | Tested | | Ubuntu 22.04 LTS | v44.x | Tested | -| Ubuntu 20.04 LTS | v44.x | Binary/.deb only | +| Ubuntu 20.04 LTS | v44.x | Source Only | Other Debian-based distributions may work but are untested. Contributions and test reports for additional platforms are welcome. @@ -97,9 +97,11 @@ sudo nssec waf enable The WAF module includes: - OWASP CRS v4 with paranoia level 1 (low false positive rate) -- NetSapiens exclusion rules for admin UI, ns-api, SiPbx, NqsProxy, and iNSight health checks +- NetSapiens exclusion rules for admin UI, ns-api, SiPbx, NqsProxy, portal login, phone provisioning, iNSight health checks, and localhost traffic - CRS tuning for allowed HTTP methods and content types used by NetSapiens +WAF rule templates (exclusions and CRS setup overrides) are defined in `src/nssec/modules/waf/config.py` and deployed to `/etc/modsecurity/` by `nssec waf init`. + ### Path Restrictions (.htaccess) Restrict access to sensitive NetSapiens paths (admin UI, API, NDP, recording) using `.htaccess` IP allowlists: @@ -167,24 +169,21 @@ Start with `standard` and review the Apache API Usage dashboard and mod_evasive | Component | Core | NDP | Recording | QoS | |-----------|:----:|:---:|:---------:|:---:| -| WAF — Admin UI | Yes | — | — | — | -| WAF — Endpoints | — | Yes | — | — | -| WAF — Large Upload | — | — | Yes | — | +| WAF | Yes | Yes | Yes | Yes | | mTLS Provisioning | — | Yes | — | — | -| MySQL Hardening | Yes | — | — | — | - -## Grafana Dashboards & Insight Templates +| MySQL Hardening | Yes | Yes | Yes | Yes | -Pre-built dashboards are available for import into your Grafana/iNSight instance: +## iNSight Templates -**Dashboards** (`dashboards/`): -- `security/apacheHttpServerLogs.json` — Apache error and access logs with HTTP status breakdown +Pre-built dashboards for import into your iNSight/Grafana instance (`insight/`): -**Insight Templates** (`insight/`): - `api.json` — API v1/v2 request rate monitoring (Prometheus) - `apacheApiUsage.json` — Apache access log analysis by IP and path (Loki) - `modsecurityWaf.json` — ModSecurity WAF event analysis: severity, attacking IPs, triggered rules, targeted URIs (Loki) - `modEvasive.json` — mod_evasive HTTP flood protection: blocked IPs, block rate, repeat offenders (Loki) +- `sshLogin.json` — SSH login monitor: failed/successful logins, brute-force source IPs, targeted usernames (Loki) + +![WAF iNSight Dashboard](docs/img/waf-insight.png) ## Related Projects diff --git a/docs/img/waf-insight.png b/docs/img/waf-insight.png new file mode 100644 index 0000000..5d66780 Binary files /dev/null and b/docs/img/waf-insight.png differ diff --git a/insight/sshLogin.json b/insight/sshLogin.json new file mode 100644 index 0000000..9d453ac --- /dev/null +++ b/insight/sshLogin.json @@ -0,0 +1,1045 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "description": "Dashboard for monitoring SSH login activity: failed/successful logins, brute-force sources, and targeted usernames from /var/log/auth.log", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 100, + "panels": [], + "title": "Overview Statistics", + "type": "row" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 50 + }, + { + "color": "red", + "value": 200 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 0, + "y": 1 + }, + "id": 101, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "sum(count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Failed password\" |~ `(?i)$ip_filter` [$__range]))", + "queryType": "range", + "refId": "A" + } + ], + "title": "Failed Logins", + "type": "stat" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 5, + "y": 1 + }, + "id": 102, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "sum(count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |~ \"Accepted (password|publickey|keyboard-interactive) for\" |~ `(?i)$ip_filter` [$__range]))", + "queryType": "range", + "refId": "A" + } + ], + "title": "SSH Sessions", + "type": "stat" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 10, + "y": 1 + }, + "id": 105, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "count(sum by (username, src_ip) (count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |~ \"Accepted (password|publickey|keyboard-interactive) for\" |~ `(?i)$ip_filter` | regexp `for (?P\\S+) from (?P\\d+\\.\\d+\\.\\d+\\.\\d+)` [$__range])))", + "queryType": "instant", + "refId": "A" + } + ], + "title": "Unique Logins", + "description": "Distinct user + source IP combinations", + "type": "stat" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 5 + }, + { + "color": "red", + "value": 20 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 15, + "y": 1 + }, + "id": 103, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "count(sum by (src_ip) (count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Failed password\" |~ `(?i)$ip_filter` | regexp `from (?P\\d+\\.\\d+\\.\\d+\\.\\d+)` [$__range])))", + "queryType": "instant", + "refId": "A" + } + ], + "title": "Unique Source IPs (Failed)", + "type": "stat" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 10 + }, + { + "color": "red", + "value": 50 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 4, + "x": 20, + "y": 1 + }, + "id": 104, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "sum(count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Invalid user\" |~ `(?i)$ip_filter` [$__range]))", + "queryType": "range", + "refId": "A" + } + ], + "title": "Invalid Users", + "type": "stat" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 5 + }, + "id": 200, + "panels": [], + "title": "Login Trends", + "type": "row" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "count", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 50, + "gradientMode": "scheme", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Failed" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Accepted" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 6 + }, + "id": 201, + "options": { + "legend": { + "calcs": [ + "sum", + "max", + "mean" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "sum(count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Failed password\" |~ `(?i)$ip_filter` [$__interval]))", + "legendFormat": "Failed", + "queryType": "range", + "refId": "A" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "sum(count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |~ \"Accepted (password|publickey|keyboard-interactive) for\" |~ `(?i)$ip_filter` [$__interval]))", + "legendFormat": "Accepted", + "queryType": "range", + "refId": "B" + } + ], + "title": "Login Attempts Over Time", + "type": "timeseries" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 6 + }, + "id": 202, + "options": { + "legend": { + "calcs": [ + "sum", + "max", + "mean" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "topk(10, sum by (src_ip) (count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Failed password\" |~ `(?i)$ip_filter` | regexp `from (?P\\d+\\.\\d+\\.\\d+\\.\\d+)` [$__interval])))", + "legendFormat": "{{src_ip}}", + "queryType": "range", + "refId": "A" + } + ], + "title": "Failed Logins by IP Over Time", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 14 + }, + "id": 300, + "panels": [], + "title": "Top Offenders", + "type": "row" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "left", + "displayMode": "auto", + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 10 + }, + { + "color": "red", + "value": 50 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "IP Address" + }, + "properties": [ + { + "id": "custom.width", + "value": 200 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Failed Count" + }, + "properties": [ + { + "id": "custom.width", + "value": 150 + }, + { + "id": "custom.displayMode", + "value": "gradient-gauge" + } + ] + } + ] + }, + "gridPos": { + "h": 10, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 301, + "options": { + "footer": { + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true, + "sortBy": [ + { + "desc": true, + "displayName": "Failed Count" + } + ] + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "topk(20, sum by (src_ip) (count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Failed password\" |~ `(?i)$ip_filter` | regexp `from (?P\\d+\\.\\d+\\.\\d+\\.\\d+)` [$__range])))", + "legendFormat": "{{src_ip}}", + "queryType": "instant", + "refId": "A" + } + ], + "title": "Top 20 Failed IPs", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true + }, + "indexByName": { + "src_ip": 0, + "Value": 1 + }, + "renameByName": { + "Value": "Failed Count", + "src_ip": "IP Address" + } + } + } + ], + "type": "table" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "left", + "displayMode": "auto", + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 10 + }, + { + "color": "red", + "value": 50 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Username" + }, + "properties": [ + { + "id": "custom.width", + "value": 200 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Attempt Count" + }, + "properties": [ + { + "id": "custom.width", + "value": 150 + }, + { + "id": "custom.displayMode", + "value": "gradient-gauge" + } + ] + } + ] + }, + "gridPos": { + "h": 10, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 302, + "options": { + "footer": { + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true, + "sortBy": [ + { + "desc": true, + "displayName": "Attempt Count" + } + ] + }, + "pluginVersion": "9.1.6", + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "topk(20, sum by (username) (count_over_time({instance=~\"$node\", filename=\"/var/log/auth.log\"} |= \"Failed password\" |~ `(?i)$ip_filter` | regexp `Failed password for (invalid user )?(?P\\S+)` [$__range])))", + "legendFormat": "{{username}}", + "queryType": "instant", + "refId": "A" + } + ], + "title": "Top 20 Targeted Usernames", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true + }, + "indexByName": { + "username": 0, + "Value": 1 + }, + "renameByName": { + "Value": "Attempt Count", + "username": "Username" + } + } + } + ], + "type": "table" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 25 + }, + "id": 400, + "panels": [], + "title": "Auth Logs", + "type": "row" + }, + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "gridPos": { + "h": 15, + "w": 24, + "x": 0, + "y": 26 + }, + "id": 401, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": true, + "sortOrder": "Descending", + "wrapLogMessage": true + }, + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "${loki_datasource}" + }, + "editorMode": "code", + "expr": "{instance=~\"$node\", filename=\"/var/log/auth.log\"} |~ `(?i)$ip_filter`", + "queryType": "range", + "refId": "A" + } + ], + "title": "Auth Log Stream", + "type": "logs" + } + ], + "refresh": "30s", + "schemaVersion": 37, + "style": "dark", + "tags": [ + "ssh", + "auth", + "security" + ], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "", + "value": "" + }, + "hide": 2, + "includeAll": false, + "label": "", + "multi": false, + "name": "prometheus_datasource", + "options": [], + "query": "prometheus", + "queryValue": "", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + }, + { + "current": { + "selected": false, + "text": "", + "value": "" + }, + "hide": 2, + "includeAll": false, + "label": "", + "multi": false, + "name": "loki_datasource", + "options": [], + "query": "loki", + "queryValue": "", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + }, + { + "current": { + "selected": false, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "prometheus", + "uid": "${prometheus_datasource}" + }, + "definition": "label_values(insight_log_rate,instance)", + "hide": 0, + "includeAll": true, + "label": "Host:", + "multi": true, + "name": "node", + "options": [], + "query": { + "query": "label_values(insight_log_rate,instance)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + }, + { + "current": { + "selected": false, + "text": "", + "value": "" + }, + "hide": 0, + "label": "IP Filter (regex):", + "name": "ip_filter", + "options": [ + { + "selected": true, + "text": "", + "value": "" + } + ], + "query": "", + "skipUrlSync": false, + "type": "textbox" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h" + ] + }, + "timezone": "", + "title": "SSH Login Monitor", + "uid": "ssh-login-monitor", + "version": 1, + "weekStart": "" +} diff --git a/rules/crs/crs-setup-netsapiens.conf b/rules/crs/crs-setup-netsapiens.conf deleted file mode 100644 index a93c0fe..0000000 --- a/rules/crs/crs-setup-netsapiens.conf +++ /dev/null @@ -1,47 +0,0 @@ -# OWASP CRS Setup Overrides for NetSapiens -# Source file — used as basis for /etc/modsecurity/crs/crs-setup.conf by nssec waf init -# -# Paranoia level 1 is conservative — increase after tuning false positives. -# See https://coreruleset.org/docs/concepts/paranoia_levels/ for guidance. - -# Paranoia level (1 = low FP, less coverage; 4 = max coverage, many FPs) -SecAction \ - "id:900000,\ - phase:1,\ - nolog,\ - pass,\ - t:none,\ - setvar:tx.crs_setup_version=400,\ - setvar:tx.paranoia_level=1,\ - setvar:tx.blocking_paranoia_level=1,\ - setvar:tx.detection_paranoia_level=1" - -# Anomaly scoring thresholds -# Inbound 5 = block after a single critical rule match (default) -# Outbound 4 = block on data leakage detection (default) -SecAction \ - "id:900110,\ - phase:1,\ - nolog,\ - pass,\ - t:none,\ - setvar:tx.inbound_anomaly_score_threshold=5,\ - setvar:tx.outbound_anomaly_score_threshold=4" - -# Allowed HTTP methods for NetSapiens admin UI and API -SecAction \ - "id:900200,\ - phase:1,\ - nolog,\ - pass,\ - t:none,\ - setvar:'tx.allowed_methods=GET HEAD POST OPTIONS PUT PATCH DELETE'" - -# Allowed content types -SecAction \ - "id:900220,\ - phase:1,\ - nolog,\ - pass,\ - t:none,\ - setvar:'tx.allowed_request_content_type=|application/x-www-form-urlencoded| |multipart/form-data| |multipart/related| |multipart/mixed| |text/xml| |application/xml| |application/soap+xml| |application/json| |application/cloudevents+json| |application/cloudevents-batch+json|'" diff --git a/rules/netsapiens/netsapiens-exclusions.conf b/rules/netsapiens/netsapiens-exclusions.conf deleted file mode 100644 index dd3c20b..0000000 --- a/rules/netsapiens/netsapiens-exclusions.conf +++ /dev/null @@ -1,77 +0,0 @@ -# NetSapiens-specific ModSecurity Exclusions -# Source file — installed to /etc/modsecurity/netsapiens-exclusions.conf by nssec waf init -# -# These rules prevent false positives on the NetSapiens management UI -# and API endpoints while keeping CRS protection active for everything else. -# -# Rule ID range: 1000001–1000999 (reserved for nssec) - -# ---- Admin UI form submissions trigger SQL injection false positives ---- -# The admin search/filter fields send SQL-like syntax that CRS flags incorrectly. -SecRuleUpdateTargetById 942100 "!REQUEST_COOKIES" -SecRuleUpdateTargetById 942200 "!REQUEST_COOKIES" - -# ---- NS API endpoints use base64 in query strings ---- -SecRule REQUEST_URI "@beginsWith /ns-api/" \ - "id:1000001,\ - phase:1,\ - pass,\ - nolog,\ - ctl:ruleRemoveTargetByTag=OWASP_CRS;ARGS:filter" - -# ---- Admin UI session handling ---- -# Rule 921180 (HTTP Parameter Pollution) false-positives on NS session cookies. -SecRule REQUEST_URI "@beginsWith /SiPbx/" \ - "id:1000002,\ - phase:1,\ - pass,\ - nolog,\ - ctl:ruleRemoveById=921180" - -# ---- QoS proxy (NqsProxy) ---- -# The QoS proxy sends multipart/mixed which triggers rule 920420 -# (content type not allowed by policy). This is normal NS internal traffic. -SecRule REQUEST_URI "@beginsWith /NqsProxy/" \ - "id:1000003,\ - phase:1,\ - pass,\ - nolog,\ - ctl:ruleRemoveById=920420" - -# ---- QoS proxy (NqsProxy) ---- -# The QoS proxy sends multipart/mixed which triggers rule 920420 -# (content type not allowed by policy). This is normal NS internal traffic. -SecRule REQUEST_URI "@beginsWith /NqsProxy/" \ - "id:1000003,\ - phase:1,\ - pass,\ - nolog,\ - ctl:ruleRemoveById=920420" - -# ---- iNSight health checks ---- -# The iNSight monitoring system polls /cfg/insight_healthcheck.html via AJP proxy. -# Apache denies these with 403 (mod_authz_core), and ModSecurity audit-logs them -# because it captures all 4xx/5xx responses. Bypass CRS for this endpoint. -SecRule REQUEST_URI "@beginsWith /cfg/insight_healthcheck" \ - "id:1000004,\ - phase:1,\ - pass,\ - nolog,\ - ctl:ruleRemoveByTag=OWASP_CRS" - -# ---- Localhost internal traffic ---- -# NetSapiens services (NmsSBus, ns-api, cfg) communicate over localhost. -# These internal calls trigger false positives: -# 921110 — NmsSBus RegistrarEvents body contains "connect " which -# matches the HTTP Request Smuggling pattern (connect is a SIP field, -# not an HTTP CONNECT method) -# 920180 — ns-api internal POSTs lack Content-Length header -# 920350 — internal requests use Host: 127.0.0.1 instead of a hostname -# Bypassing CRS for localhost is safe — if an attacker has local access, -# the WAF is already irrelevant. -SecRule REMOTE_ADDR "@ipMatch 127.0.0.1" \ - "id:1000005,\ - phase:1,\ - pass,\ - nolog,\ - ctl:ruleRemoveByTag=OWASP_CRS" diff --git a/src/nssec/cli/__init__.py b/src/nssec/cli/__init__.py index a3e51fa..df23551 100644 --- a/src/nssec/cli/__init__.py +++ b/src/nssec/cli/__init__.py @@ -6,7 +6,6 @@ from __future__ import annotations from pathlib import Path -from typing import Optional import click from rich.console import Console @@ -36,7 +35,7 @@ def _is_within_allowed_bases(resolved: Path, bases: tuple[Path, ...]) -> bool: def validate_path( path_str: str, param_name: str, - allowed_bases: Optional[tuple[Path, ...]] = None, + allowed_bases: tuple[Path, ...] | None = None, must_be_within_cwd: bool = False, ) -> Path: """Validate a path to prevent path traversal attacks. diff --git a/src/nssec/cli/main.py b/src/nssec/cli/main.py index 46f5e25..cb1ba01 100644 --- a/src/nssec/cli/main.py +++ b/src/nssec/cli/main.py @@ -70,6 +70,7 @@ def cli(ctx, host, sudo): # Set up sudo for local execution if no host specified if sudo and not host: from nssec.core.ssh import set_use_sudo + set_use_sudo(True) if host: diff --git a/src/nssec/cli/mtls_commands.py b/src/nssec/cli/mtls_commands.py index 701058e..4e2d71e 100644 --- a/src/nssec/cli/mtls_commands.py +++ b/src/nssec/cli/mtls_commands.py @@ -23,11 +23,15 @@ def mtls(ctx): console.print("[bold]Allowlist Commands:[/bold]") console.print(" [cyan]nssec mtls allowlist show[/cyan] Show all whitelisted IPs") console.print(" [cyan]nssec mtls allowlist add[/cyan] Add an IP to the allowlist") - console.print(" [cyan]nssec mtls allowlist remove[/cyan] Remove an IP from the allowlist") + console.print( + " [cyan]nssec mtls allowlist remove[/cyan] Remove an IP from the allowlist" + ) console.print() console.print("[bold]NodePing Commands:[/bold]") console.print(" [cyan]nssec mtls nodeping show[/cyan] Show current NodePing IPs") - console.print(" [cyan]nssec mtls nodeping fetch[/cyan] Fetch IPs from NodePing (dry run)") + console.print( + " [cyan]nssec mtls nodeping fetch[/cyan] Fetch IPs from NodePing (dry run)" + ) console.print(" [cyan]nssec mtls nodeping update[/cyan] Fetch and apply NodePing IPs") console.print(" [cyan]nssec mtls nodeping remove[/cyan] Remove NodePing IPs section") diff --git a/src/nssec/cli/waf_commands.py b/src/nssec/cli/waf_commands.py index c1f05ef..dbe0e61 100644 --- a/src/nssec/cli/waf_commands.py +++ b/src/nssec/cli/waf_commands.py @@ -154,13 +154,17 @@ def _build_status_table(status): table.add_row("CRS path", status.crs_path) setup_val = _yn(status.crs_setup_present) if not status.crs_setup_present: - setup_val += " [red](rule 901001 will flag all traffic! run [cyan]nssec waf init[/cyan])[/red]" + setup_val += ( + " [red](rule 901001 will flag all traffic! run [cyan]nssec waf init[/cyan])[/red]" + ) table.add_row("crs-setup.conf", setup_val) else: table.add_row("OWASP CRS", "[red]not installed[/red]") if status.evasive_installed: - evasive_state = "[green]enabled[/green]" if status.evasive_enabled else "[yellow]disabled[/yellow]" + evasive_state = ( + "[green]enabled[/green]" if status.evasive_enabled else "[yellow]disabled[/yellow]" + ) table.add_row("mod_evasive", evasive_state) else: table.add_row("mod_evasive", "[dim]not installed[/dim]") @@ -181,11 +185,10 @@ def _build_status_table(status): elif not status.exclusions_current: v = status.exclusions_version or "unknown" excl_val = ( - f"[yellow]outdated (v{v})[/yellow] — " - "run [cyan]nssec waf update-exclusions[/cyan]" + f"[yellow]outdated (v{v})[/yellow] — run [cyan]nssec waf update-exclusions[/cyan]" ) else: - excl_val = "[green]active (v{})[/green]".format(status.exclusions_version) + excl_val = f"[green]active (v{status.exclusions_version})[/green]" table.add_row("NS exclusions", excl_val) if status.exclusions_admin_ips: table.add_row(" Admin IPs", str(status.exclusions_admin_ips)) @@ -301,9 +304,7 @@ def waf_enable(yes): if result.success: console.print(f"[green]{result.message}[/green]") console.print() - console.print( - "[bold]Tip:[/bold] To also enable HTTP flood protection, run:" - ) + console.print("[bold]Tip:[/bold] To also enable HTTP flood protection, run:") console.print(" [cyan]sudo nssec waf evasive enable[/cyan]") else: console.print(f"[red]Error: {result.error}[/red]") @@ -350,10 +351,9 @@ def waf_remove(yes): This disables the security2 module in Apache, effectively turning off the WAF completely. Use 'nssec waf init' to re-enable. """ - from nssec.modules.waf import ModSecurityInstaller - from nssec.modules.waf.utils import run_cmd, file_exists - from nssec.modules.waf.config import SECURITY2_LOAD from nssec.core.ssh import is_root + from nssec.modules.waf.config import SECURITY2_LOAD + from nssec.modules.waf.utils import file_exists, run_cmd if not is_root(): console.print("[red]Error: Must run as root (sudo nssec waf remove)[/red]") @@ -469,7 +469,6 @@ def waf_update(yes): rules that were disabled during init and validates the Apache config. """ from nssec.modules.waf import ModSecurityInstaller - from nssec.modules.waf.utils import detect_modsec_version, version_gte from nssec.modules.waf.config import ( CRS_INSTALL_DIR, DIGITALWAVE_KEY_URL, @@ -477,6 +476,7 @@ def waf_update(yes): DIGITALWAVE_LIST, DIGITALWAVE_REPO_URL, ) + from nssec.modules.waf.utils import detect_modsec_version, version_gte installer = ModSecurityInstaller() pf = installer.preflight() @@ -487,9 +487,7 @@ def waf_update(yes): if not version_gte(current_ver, "2.9.6"): console.print() - console.print( - "[yellow]ModSecurity < 2.9.6 — some CRS v4 rules are disabled.[/yellow]" - ) + console.print("[yellow]ModSecurity < 2.9.6 — some CRS v4 rules are disabled.[/yellow]") console.print( "Ubuntu 22.04 ships ModSecurity 2.9.5 which lacks support for " "multipart rules introduced in 2.9.6." @@ -499,8 +497,7 @@ def waf_update(yes): console.print() keyring = DIGITALWAVE_KEYRING console.print( - f" [cyan]curl -fsSL {DIGITALWAVE_KEY_URL} " - f"| sudo gpg --dearmor -o {keyring}[/cyan]" + f" [cyan]curl -fsSL {DIGITALWAVE_KEY_URL} | sudo gpg --dearmor -o {keyring}[/cyan]" ) console.print() # Escape square brackets so Rich doesn't treat [signed-by=...] as markup @@ -508,17 +505,14 @@ def waf_update(yes): repo = DIGITALWAVE_REPO_URL lst = DIGITALWAVE_LIST console.print( - f' [cyan]echo "deb {signed} {repo} $(lsb_release -sc) main" ' - f"| sudo tee {lst}[/cyan]" + f' [cyan]echo "deb {signed} {repo} $(lsb_release -sc) main" | sudo tee {lst}[/cyan]' ) console.print( f' [cyan]echo "deb {signed} {repo} $(lsb_release -sc)-backports main" ' f"| sudo tee -a {lst}[/cyan]" ) console.print() - console.print( - " [cyan]sudo apt-get update[/cyan]" - ) + console.print(" [cyan]sudo apt-get update[/cyan]") console.print( " [cyan]sudo apt-get install -t $(lsb_release -sc)-backports " "libapache2-mod-security2[/cyan]" @@ -534,12 +528,13 @@ def waf_update(yes): crs_path = pf.crs_path or CRS_INSTALL_DIR reenabled = installer._reenable_crs_rules(crs_path) if not reenabled: - console.print("[green]ModSecurity >= 2.9.6 and all CRS rules are active. Nothing to do.[/green]") + console.print( + "[green]ModSecurity >= 2.9.6 and all CRS rules are active. Nothing to do.[/green]" + ) return console.print( - f" [green]Done:[/green] Re-enabled {len(reenabled)} CRS rule(s): " - + ", ".join(reenabled) + f" [green]Done:[/green] Re-enabled {len(reenabled)} CRS rule(s): " + ", ".join(reenabled) ) val = installer.validate_config() @@ -593,7 +588,7 @@ def waf_allowlist_add(ip, yes): IP can be a single address (192.168.1.1) or CIDR notation (10.0.0.0/8). Allowlisted IPs bypass OWASP CRS rules for reduced false positives. """ - from nssec.modules.waf import ModSecurityInstaller, get_allowlisted_ips, add_allowlisted_ip + from nssec.modules.waf import ModSecurityInstaller, add_allowlisted_ip, get_allowlisted_ips installer = ModSecurityInstaller() pf = installer.preflight() @@ -678,7 +673,7 @@ def waf_evasive(ctx): "--profile", type=click.Choice(["standard", "strict"]), default="standard", - help="Threshold profile: standard (high thresholds, safe default) or strict (tuned for NS traffic)", + help="Threshold profile: standard (safe default) or strict (tuned for NS traffic)", ) def waf_evasive_enable(yes, profile): """Enable mod_evasive HTTP flood protection. @@ -708,15 +703,12 @@ def waf_evasive_enable(yes, profile): raise SystemExit(1) if not package_installed(EVASIVE_PACKAGE): - console.print( - "[red]Error: mod_evasive is not installed. " - "Run 'nssec waf init' first.[/red]" - ) + console.print("[red]Error: mod_evasive is not installed. Run 'nssec waf init' first.[/red]") raise SystemExit(1) thresholds = EVASIVE_PROFILES[profile] console.print( - f"[bold yellow]Warning:[/bold yellow] mod_evasive has no detection-only mode. " + "[bold yellow]Warning:[/bold yellow] mod_evasive has no detection-only mode. " "When enabled it [bold]will block[/bold] IPs that exceed thresholds (HTTP 403)." ) console.print(f" Profile: [cyan]{profile}[/cyan]") @@ -756,8 +748,8 @@ def waf_evasive_enable(yes, profile): @click.option("--yes", "-y", is_flag=True, help="Skip confirmation") def waf_evasive_disable(yes): """Disable mod_evasive HTTP flood protection.""" - from nssec.modules.waf import ModSecurityInstaller from nssec.core.ssh import is_root + from nssec.modules.waf import ModSecurityInstaller if not is_root(): console.print("[red]Error: Must run as root (sudo nssec waf evasive disable)[/red]") @@ -824,9 +816,7 @@ def waf_evasive_status(): console.print(f" Profile: [cyan]{profile}[/cyan]") if not enabled: - console.print( - "\n Enable with: [cyan]sudo nssec waf evasive enable[/cyan]" - ) + console.print("\n Enable with: [cyan]sudo nssec waf evasive enable[/cyan]") # ─── RESTRICT SUBCOMMANDS ─── @@ -916,7 +906,9 @@ def waf_restrict_show(): console.print(" Run [cyan]nssec waf restrict reapply[/cyan] after NS upgrades to restore") elif first_ips_managed: console.print("\n[bold]IP cache:[/bold] [yellow]not saved[/yellow]") - console.print(" Run [cyan]nssec waf restrict init[/cyan] to save IPs for reapply after upgrades") + console.print( + " Run [cyan]nssec waf restrict init[/cyan] to save IPs for reapply after upgrades" + ) @waf_restrict.command("init") @@ -940,9 +932,9 @@ def waf_restrict_init(ips, dry_run, yes): """ import ipaddress - from nssec.core.ssh import is_root from nssec.core.server_types import detect_server_type - from nssec.modules.waf.restrict import init_restrictions, collect_existing_ips + from nssec.core.ssh import is_root + from nssec.modules.waf.restrict import collect_existing_ips, init_restrictions if not is_root(): console.print("[red]Error: Must run as root (sudo nssec waf restrict init)[/red]") @@ -974,9 +966,7 @@ def waf_restrict_init(ips, dry_run, yes): "[bold]Enter IP addresses to allow access[/bold] " "(one per line, or space/comma separated)." ) - console.print( - " Include NetSapiens TAC IPs and your admin office IPs." - ) + console.print(" Include NetSapiens TAC IPs and your admin office IPs.") console.print(" 127.0.0.1 is always included automatically.") console.print(" Press Enter on a blank line when done.") console.print() @@ -1010,8 +1000,9 @@ def waf_restrict_init(ips, dry_run, yes): console.print() if dry_run: - results = init_restrictions(server_type, ip_list, dry_run=True, - merge_existing=merge_existing) + results = init_restrictions( + server_type, ip_list, dry_run=True, merge_existing=merge_existing + ) for name, result in results: label = f"[cyan]{name}:[/cyan] " if name else "" console.print(f" {label}{result.message}") @@ -1022,8 +1013,7 @@ def waf_restrict_init(ips, dry_run, yes): console.print("[yellow]Aborted.[/yellow]") return - results = init_restrictions(server_type, ip_list, - merge_existing=merge_existing) + results = init_restrictions(server_type, ip_list, merge_existing=merge_existing) any_error = False for name, result in results: label = f"[cyan]{name}:[/cyan] " if name else "" @@ -1051,8 +1041,8 @@ def waf_restrict_add(ip, yes): """ import ipaddress - from nssec.core.ssh import is_root from nssec.core.server_types import detect_server_type + from nssec.core.ssh import is_root from nssec.modules.waf.restrict import add_restricted_ip if not is_root(): @@ -1097,8 +1087,8 @@ def waf_restrict_remove(ip, yes): Cannot remove 127.0.0.1 (localhost is always required). """ - from nssec.core.ssh import is_root from nssec.core.server_types import detect_server_type + from nssec.core.ssh import is_root from nssec.modules.waf.restrict import remove_restricted_ip if not is_root(): @@ -1139,9 +1129,9 @@ def waf_restrict_reapply(dry_run, yes): Reads the saved IP list from /etc/nssec/restrict-ips.json and re-creates all managed .htaccess files. """ - from nssec.core.ssh import is_root from nssec.core.server_types import detect_server_type - from nssec.modules.waf.restrict import reapply_restrictions, load_cached_ips + from nssec.core.ssh import is_root + from nssec.modules.waf.restrict import load_cached_ips, reapply_restrictions if not is_root(): console.print("[red]Error: Must run as root (sudo nssec waf restrict reapply)[/red]") diff --git a/src/nssec/core/cache.py b/src/nssec/core/cache.py index da19ff6..0482031 100644 --- a/src/nssec/core/cache.py +++ b/src/nssec/core/cache.py @@ -22,14 +22,13 @@ import threading import time from pathlib import Path -from typing import Optional, Union from nssec.core import ssh def _remove_suffix(text: str, suffix: str) -> str: """Remove suffix from string (Python 3.8 compatible).""" - return text[:-len(suffix)] if suffix and text.endswith(suffix) else text + return text[: -len(suffix)] if suffix and text.endswith(suffix) else text def _run_subprocess(cmd: list[str], timeout: int = 30) -> tuple[str, int]: @@ -46,7 +45,7 @@ def _run_subprocess(cmd: list[str], timeout: int = 30) -> tuple[str, int]: return stdout, rc -def _parse_dpkg_line(line: str) -> Optional[str]: +def _parse_dpkg_line(line: str) -> str | None: """Parse a dpkg -l output line to extract package name if installed. Args: @@ -64,7 +63,7 @@ def _parse_dpkg_line(line: str) -> Optional[str]: return parts[1].split(":")[0] -def _parse_service_line(line: str) -> tuple[Optional[str], Optional[str]]: +def _parse_service_line(line: str) -> tuple[str | None, str | None]: """Parse a systemctl list-units output line to extract service names. Args: @@ -99,7 +98,7 @@ def _read_ufw_files() -> str: return content -def _safe_read_file(path: Union[str, Path]) -> Optional[str]: +def _safe_read_file(path: str | Path) -> str | None: """Safely read a file, returning None on any error (locally or via SSH). Args: @@ -138,15 +137,15 @@ def __init__(self, ttl: float = 0) -> None: self._dpkg_time: float = 0 # Service (systemctl) cache - self._active_services: Optional[set[str]] = None + self._active_services: set[str] | None = None self._services_time: float = 0 # File content cache - self._files: dict[str, Optional[str]] = {} + self._files: dict[str, str | None] = {} self._file_times: dict[str, float] = {} # UFW rules cache - self._ufw_rules: Optional[str] = None + self._ufw_rules: str | None = None self._ufw_rules_loaded: bool = False self._ufw_time: float = 0 @@ -286,7 +285,7 @@ def cached_service_active(self, service_name: str) -> bool: or f"{normalized}.service" in self._active_services ) - def cached_file_read(self, path: Union[str, Path]) -> Optional[str]: + def cached_file_read(self, path: str | Path) -> str | None: """Read file contents with caching. Caches file contents by path. Subsequent reads return cached content @@ -306,7 +305,7 @@ def cached_file_read(self, path: Union[str, Path]) -> Optional[str]: return content return self._load_and_cache_file(path, path_str) - def _get_valid_cached_file(self, path_str: str) -> tuple[bool, Optional[str]]: + def _get_valid_cached_file(self, path_str: str) -> tuple[bool, str | None]: """Get cached file content if valid and not expired. Args: @@ -322,7 +321,7 @@ def _get_valid_cached_file(self, path_str: str) -> tuple[bool, Optional[str]]: return False, None return True, self._files[path_str] - def _load_and_cache_file(self, path: Union[str, Path], path_str: str) -> Optional[str]: + def _load_and_cache_file(self, path: str | Path, path_str: str) -> str | None: """Load a file and store it in cache (internal, called with lock held). Args: @@ -338,7 +337,7 @@ def _load_and_cache_file(self, path: Union[str, Path], path_str: str) -> Optiona self._file_times[path_str] = time.time() return content - def cached_ufw_rules(self) -> Optional[str]: + def cached_ufw_rules(self) -> str | None: """Read UFW rules from config files with caching. Reads UFW rules from /etc/ufw/user.rules and /etc/ufw/user6.rules. @@ -352,7 +351,7 @@ def cached_ufw_rules(self) -> Optional[str]: return self._ufw_rules return self._load_ufw_rules() - def _load_ufw_rules(self) -> Optional[str]: + def _load_ufw_rules(self) -> str | None: """Load UFW rules into cache (internal, called with lock held). Returns: @@ -370,7 +369,7 @@ def _load_ufw_rules(self) -> Optional[str]: return self._cache_ufw_result(None) - def _cache_ufw_result(self, content: Optional[str]) -> Optional[str]: + def _cache_ufw_result(self, content: str | None) -> str | None: """Store UFW result in cache and return it. Args: @@ -433,7 +432,7 @@ def cached_service_active(service_name: str) -> bool: return session_cache.cached_service_active(service_name) -def cached_file_read(path: Union[str, Path]) -> Optional[str]: +def cached_file_read(path: str | Path) -> str | None: """Read file contents with caching (uses session cache). This is a convenience wrapper around session_cache.cached_file_read(). @@ -447,7 +446,7 @@ def cached_file_read(path: Union[str, Path]) -> Optional[str]: return session_cache.cached_file_read(path) -def cached_ufw_rules() -> Optional[str]: +def cached_ufw_rules() -> str | None: """Read UFW rules with caching (uses session cache). This is a convenience wrapper around session_cache.cached_ufw_rules(). diff --git a/src/nssec/core/checklist.py b/src/nssec/core/checklist.py index da6fe51..9b2a83e 100644 --- a/src/nssec/core/checklist.py +++ b/src/nssec/core/checklist.py @@ -6,7 +6,6 @@ from dataclasses import dataclass, field from enum import Enum from pathlib import Path -from typing import Optional from nssec.core import ssh @@ -40,9 +39,9 @@ class CheckResult: status: CheckStatus severity: Severity message: str - details: Optional[str] = None - remediation: Optional[str] = None - reference: Optional[str] = None + details: str | None = None + remediation: str | None = None + reference: str | None = None @dataclass @@ -87,15 +86,15 @@ class BaseCheck(ABC): name: str = "" description: str = "" severity: Severity = Severity.MEDIUM - applies_to: Optional[list[str]] = None # Server types this check applies to - reference: Optional[str] = None # Documentation reference + applies_to: list[str] | None = None # Server types this check applies to + reference: str | None = None # Documentation reference @abstractmethod def run(self) -> CheckResult: """Execute the check and return result.""" pass - def _pass(self, message: str, details: Optional[str] = None) -> CheckResult: + def _pass(self, message: str, details: str | None = None) -> CheckResult: return CheckResult( check_id=self.check_id, name=self.name, @@ -107,7 +106,7 @@ def _pass(self, message: str, details: Optional[str] = None) -> CheckResult: ) def _fail( - self, message: str, details: Optional[str] = None, remediation: Optional[str] = None + self, message: str, details: str | None = None, remediation: str | None = None ) -> CheckResult: return CheckResult( check_id=self.check_id, @@ -121,7 +120,7 @@ def _fail( ) def _warn( - self, message: str, details: Optional[str] = None, remediation: Optional[str] = None + self, message: str, details: str | None = None, remediation: str | None = None ) -> CheckResult: return CheckResult( check_id=self.check_id, @@ -143,7 +142,7 @@ def _skip(self, message: str) -> CheckResult: message=message, ) - def _error(self, message: str, details: Optional[str] = None) -> CheckResult: + def _error(self, message: str, details: str | None = None) -> CheckResult: return CheckResult( check_id=self.check_id, name=self.name, @@ -154,7 +153,7 @@ def _error(self, message: str, details: Optional[str] = None) -> CheckResult: ) -def run_command(cmd: list[str], timeout: int = 30) -> tuple[Optional[str], Optional[str], int]: +def run_command(cmd: list[str], timeout: int = 30) -> tuple[str | None, str | None, int]: """Run a command locally or remotely (if SSH host is configured). Args: @@ -187,7 +186,7 @@ def file_contains(path: Path, pattern: str, ignore_comments: bool = True) -> boo return False -def _extract_config_value(line: str, key: str, separator: str) -> Optional[str]: +def _extract_config_value(line: str, key: str, separator: str) -> str | None: """Extract a value from a config line if it matches the key.""" line = line.strip() if line.startswith("#"): @@ -200,7 +199,7 @@ def _extract_config_value(line: str, key: str, separator: str) -> Optional[str]: return None -def get_file_value(path: Path, key: str, separator: str = " ") -> Optional[str]: +def get_file_value(path: Path, key: str, separator: str = " ") -> str | None: """Get a configuration value from a file (works locally or via SSH).""" content = ssh.read_file(str(path)) if content is None: diff --git a/src/nssec/core/server_types.py b/src/nssec/core/server_types.py index 8e423a7..8d05e2e 100644 --- a/src/nssec/core/server_types.py +++ b/src/nssec/core/server_types.py @@ -4,7 +4,6 @@ from dataclasses import dataclass from enum import Enum -from typing import Optional from nssec.core.checklist import run_command @@ -144,7 +143,7 @@ class ServiceInfo: ] -def _run_command(cmd: list[str], timeout: int = 10) -> Optional[str]: +def _run_command(cmd: list[str], timeout: int = 10) -> str | None: """Run a command and return stdout, or None on error. This is a convenience wrapper around run_command from checklist.py @@ -352,7 +351,7 @@ def get_server_info() -> dict: } -def get_applicable_security_modules(server_type: Optional[ServerType] = None) -> list[str]: +def get_applicable_security_modules(server_type: ServerType | None = None) -> list[str]: """Get security modules applicable to this server type. Args: diff --git a/src/nssec/core/ssh.py b/src/nssec/core/ssh.py index 91291bf..5458bac 100644 --- a/src/nssec/core/ssh.py +++ b/src/nssec/core/ssh.py @@ -21,16 +21,15 @@ import os import subprocess from pathlib import Path -from typing import Optional # Global remote host - when set, all commands execute via SSH -_remote_host: Optional[str] = None +_remote_host: str | None = None # Global sudo flag - when set, commands are prefixed with sudo _use_sudo: bool = False -def set_remote_host(host: Optional[str]) -> None: +def set_remote_host(host: str | None) -> None: """Set the remote host for SSH execution. Args: @@ -41,7 +40,7 @@ def set_remote_host(host: Optional[str]) -> None: _remote_host = host -def get_remote_host() -> Optional[str]: +def get_remote_host() -> str | None: """Get the current remote host, or None if running locally.""" return _remote_host @@ -119,8 +118,8 @@ def __init__(self, host: str, timeout: int = 30, use_sudo: bool = False): def run_command( self, cmd: list[str], - timeout: Optional[int] = None, - use_sudo: Optional[bool] = None, + timeout: int | None = None, + use_sudo: bool | None = None, ) -> tuple[str, str, int]: """Run a command on the remote host via SSH. @@ -164,7 +163,7 @@ def run_command( except Exception as e: return "", str(e), -1 - def read_file(self, path: str) -> Optional[str]: + def read_file(self, path: str) -> str | None: """Read a file from the remote host. Args: @@ -232,10 +231,10 @@ def _shell_quote(s: str) -> str: # Global executor instance (created when remote host is set) -_executor: Optional[SSHExecutor] = None +_executor: SSHExecutor | None = None -def get_executor() -> Optional[SSHExecutor]: +def get_executor() -> SSHExecutor | None: """Get the global SSH executor, or None if running locally.""" global _executor if _remote_host: @@ -283,7 +282,7 @@ def run_command(cmd: list[str], timeout: int = 30) -> tuple[str, str, int]: return "", str(e), -1 -def read_file(path: str) -> Optional[str]: +def read_file(path: str) -> str | None: """Read a file locally or remotely depending on configuration. Args: diff --git a/src/nssec/modules/waf/__init__.py b/src/nssec/modules/waf/__init__.py index f228af2..1688e9c 100644 --- a/src/nssec/modules/waf/__init__.py +++ b/src/nssec/modules/waf/__init__.py @@ -11,7 +11,6 @@ from pathlib import Path from nssec.core.ssh import is_directory, is_root - from nssec.modules.waf.config import ( BACKUP_SUFFIX, CRS_APT_PACKAGE, @@ -22,8 +21,8 @@ CRS_SETUP_OVERRIDES_TEMPLATE, EVASIVE_CONF, EVASIVE_CONF_TEMPLATE, - EVASIVE_LOAD, EVASIVE_DEFAULT_PROFILE, + EVASIVE_LOAD, EVASIVE_LOG_DIR, EVASIVE_LOG_FILE, EVASIVE_PACKAGE, diff --git a/src/nssec/modules/waf/config.py b/src/nssec/modules/waf/config.py index 38b6073..0b28ede 100644 --- a/src/nssec/modules/waf/config.py +++ b/src/nssec/modules/waf/config.py @@ -72,7 +72,7 @@ BACKUP_SUFFIX = ".bak.nssec" # Exclusions template version — human-readable label for the template revision. -NS_EXCLUSIONS_VERSION = "2" +NS_EXCLUSIONS_VERSION = "3" # --------------------------------------------------------------------------- # Jinja2 Templates @@ -186,15 +186,16 @@ # Main ModSecurity config IncludeOptional /etc/modsecurity/modsecurity.conf - # NetSapiens-specific exclusions - IncludeOptional /etc/modsecurity/netsapiens-exclusions.conf - # OWASP CRS setup and rules IncludeOptional {{ crs_path }}/crs-setup.conf IncludeOptional {{ crs_path }}/plugins/*-config.conf IncludeOptional {{ crs_path }}/plugins/*-before.conf IncludeOptional {{ crs_path }}/rules/*.conf IncludeOptional {{ crs_path }}/plugins/*-after.conf + + # NetSapiens-specific exclusions (MUST load after CRS rules so that + # SecRuleUpdateTargetById directives can find the rules they modify) + IncludeOptional /etc/modsecurity/netsapiens-exclusions.conf """ @@ -263,6 +264,15 @@ nolog,\\ ctl:responseBodyAccess=Off" +# ---- Portal login password false positives ---- +# Passwords with shell metacharacters ($, ~, ^, |) trigger RCE rule 932270. +SecRule REQUEST_URI "@beginsWith /portal/login/login" \\ + "id:1000008,\\ + phase:2,\\ + pass,\\ + nolog,\\ + ctl:ruleRemoveTargetById=932270;ARGS:data[Login][password]" + # ---- iNSight health checks ---- SecRule REQUEST_URI "@beginsWith /cfg/insight_healthcheck" \\ "id:1000006,\\ @@ -322,7 +332,7 @@ nolog,\\ pass,\\ t:none,\\ - setvar:tx.crs_setup_version=400,\\ + setvar:tx.crs_setup_version=480,\\ setvar:tx.paranoia_level={{ paranoia_level }},\\ setvar:tx.blocking_paranoia_level={{ paranoia_level }},\\ setvar:tx.detection_paranoia_level={{ paranoia_level }}" diff --git a/src/nssec/modules/waf/restrict.py b/src/nssec/modules/waf/restrict.py index 9ef89ed..8afa78d 100644 --- a/src/nssec/modules/waf/restrict.py +++ b/src/nssec/modules/waf/restrict.py @@ -146,13 +146,15 @@ def get_restrict_status(server_type: str) -> list[dict]: managed = is_nssec_managed(path) if exists else False ips = parse_htaccess_ips(path) if exists else [] - statuses.append({ - "name": target["name"], - "path": path, - "exists": exists, - "managed": managed, - "ips": ips, - }) + statuses.append( + { + "name": target["name"], + "path": path, + "exists": exists, + "managed": managed, + "ips": ips, + } + ) return statuses @@ -246,10 +248,15 @@ def init_restrictions( results: list[tuple[str, StepResult]] = [] if not targets: - results.append(("", StepResult( - skipped=True, - message="No applicable targets found for this server type", - ))) + results.append( + ( + "", + StepResult( + skipped=True, + message="No applicable targets found for this server type", + ), + ) + ) return results for target in targets: @@ -257,9 +264,14 @@ def init_restrictions( name = target["name"] if dry_run: - results.append((name, StepResult( - message=f"Would create {path} with {len(all_ips)} IP(s)", - ))) + results.append( + ( + name, + StepResult( + message=f"Would create {path} with {len(all_ips)} IP(s)", + ), + ) + ) continue if file_exists(path): @@ -267,15 +279,25 @@ def init_restrictions( content = _render_htaccess(target, all_ips) if not write_file(path, content): - results.append((name, StepResult( - success=False, - error=f"Failed to write {path}", - ))) + results.append( + ( + name, + StepResult( + success=False, + error=f"Failed to write {path}", + ), + ) + ) continue - results.append((name, StepResult( - message=f"Created {path} with {len(all_ips)} IP(s)", - ))) + results.append( + ( + name, + StepResult( + message=f"Created {path} with {len(all_ips)} IP(s)", + ), + ) + ) # Save the full IP set to cache for reapply after upgrades if not dry_run: @@ -305,40 +327,65 @@ def add_restricted_ip( name = target["name"] if not file_exists(path): - results.append((name, StepResult( - skipped=True, - message=f"No .htaccess at {path} (run init first)", - ))) + results.append( + ( + name, + StepResult( + skipped=True, + message=f"No .htaccess at {path} (run init first)", + ), + ) + ) continue if not is_nssec_managed(path): - results.append((name, StepResult( - skipped=True, - message=f"Skipping unmanaged {path}", - ))) + results.append( + ( + name, + StepResult( + skipped=True, + message=f"Skipping unmanaged {path}", + ), + ) + ) continue current_ips = parse_htaccess_ips(path) if ip in current_ips: - results.append((name, StepResult( - skipped=True, - message=f"{ip} already in {path}", - ))) + results.append( + ( + name, + StepResult( + skipped=True, + message=f"{ip} already in {path}", + ), + ) + ) continue new_ips = current_ips + [ip] backup_file(path) content = _render_htaccess(target, new_ips) if not write_file(path, content): - results.append((name, StepResult( - success=False, - error=f"Failed to write {path}", - ))) + results.append( + ( + name, + StepResult( + success=False, + error=f"Failed to write {path}", + ), + ) + ) continue - results.append((name, StepResult( - message=f"Added {ip} to {path}", - ))) + results.append( + ( + name, + StepResult( + message=f"Added {ip} to {path}", + ), + ) + ) # Update cache with new IP cached = load_cached_ips() @@ -365,10 +412,15 @@ def remove_restricted_ip( List of (target_name, StepResult) tuples. """ if ip == "127.0.0.1": - return [("", StepResult( - success=False, - error="Cannot remove 127.0.0.1 (localhost must always be allowed)", - ))] + return [ + ( + "", + StepResult( + success=False, + error="Cannot remove 127.0.0.1 (localhost must always be allowed)", + ), + ) + ] targets = get_applicable_targets(server_type) results: list[tuple[str, StepResult]] = [] @@ -378,40 +430,65 @@ def remove_restricted_ip( name = target["name"] if not file_exists(path): - results.append((name, StepResult( - skipped=True, - message=f"No .htaccess at {path}", - ))) + results.append( + ( + name, + StepResult( + skipped=True, + message=f"No .htaccess at {path}", + ), + ) + ) continue if not is_nssec_managed(path): - results.append((name, StepResult( - skipped=True, - message=f"Skipping unmanaged {path}", - ))) + results.append( + ( + name, + StepResult( + skipped=True, + message=f"Skipping unmanaged {path}", + ), + ) + ) continue current_ips = parse_htaccess_ips(path) if ip not in current_ips: - results.append((name, StepResult( - skipped=True, - message=f"{ip} not found in {path}", - ))) + results.append( + ( + name, + StepResult( + skipped=True, + message=f"{ip} not found in {path}", + ), + ) + ) continue new_ips = [existing for existing in current_ips if existing != ip] backup_file(path) content = _render_htaccess(target, new_ips) if not write_file(path, content): - results.append((name, StepResult( - success=False, - error=f"Failed to write {path}", - ))) + results.append( + ( + name, + StepResult( + success=False, + error=f"Failed to write {path}", + ), + ) + ) continue - results.append((name, StepResult( - message=f"Removed {ip} from {path}", - ))) + results.append( + ( + name, + StepResult( + message=f"Removed {ip} from {path}", + ), + ) + ) # Update cache — remove this IP cached = load_cached_ips() @@ -439,10 +516,15 @@ def reapply_restrictions( """ cached_ips = load_cached_ips() if not cached_ips: - return [("", StepResult( - skipped=True, - message=f"No cached IPs found in {RESTRICT_CACHE_PATH} (run init first)", - ))] + return [ + ( + "", + StepResult( + skipped=True, + message=f"No cached IPs found in {RESTRICT_CACHE_PATH} (run init first)", + ), + ) + ] # Ensure 127.0.0.1 is first ips = ["127.0.0.1"] + [ip for ip in cached_ips if ip != "127.0.0.1"] @@ -451,10 +533,15 @@ def reapply_restrictions( results: list[tuple[str, StepResult]] = [] if not targets: - results.append(("", StepResult( - skipped=True, - message="No applicable targets found for this server type", - ))) + results.append( + ( + "", + StepResult( + skipped=True, + message="No applicable targets found for this server type", + ), + ) + ) return results for target in targets: @@ -462,9 +549,14 @@ def reapply_restrictions( name = target["name"] if dry_run: - results.append((name, StepResult( - message=f"Would write {path} with {len(ips)} cached IP(s)", - ))) + results.append( + ( + name, + StepResult( + message=f"Would write {path} with {len(ips)} cached IP(s)", + ), + ) + ) continue if file_exists(path): @@ -472,14 +564,24 @@ def reapply_restrictions( content = _render_htaccess(target, ips) if not write_file(path, content): - results.append((name, StepResult( - success=False, - error=f"Failed to write {path}", - ))) + results.append( + ( + name, + StepResult( + success=False, + error=f"Failed to write {path}", + ), + ) + ) continue - results.append((name, StepResult( - message=f"Restored {path} with {len(ips)} cached IP(s)", - ))) + results.append( + ( + name, + StepResult( + message=f"Restored {path} with {len(ips)} cached IP(s)", + ), + ) + ) return results diff --git a/src/nssec/modules/waf/status.py b/src/nssec/modules/waf/status.py index 72f02f1..25d31f9 100644 --- a/src/nssec/modules/waf/status.py +++ b/src/nssec/modules/waf/status.py @@ -5,7 +5,6 @@ import re from dataclasses import dataclass, field from pathlib import Path -from typing import Optional from nssec.modules.waf.config import ( CRS_RULES_REQUIRE_296, @@ -17,7 +16,6 @@ MODSEC_PACKAGE, NS_EXCLUSIONS_CONF, NS_EXCLUSIONS_HASH, - NS_EXCLUSIONS_VERSION, SECURITY2_CONF, SECURITY2_LOAD, ) @@ -27,25 +25,25 @@ class WafStatus: """Current state of ModSecurity / CRS.""" - apache_version: Optional[str] = None + apache_version: str | None = None apache_ppa: bool = False modsec_installed: bool = False modsec_enabled: bool = False - modsec_mode: Optional[str] = None + modsec_mode: str | None = None crs_installed: bool = False - crs_version: Optional[str] = None - crs_path: Optional[str] = None + crs_version: str | None = None + crs_path: str | None = None crs_setup_present: bool = False evasive_installed: bool = False evasive_enabled: bool = False exclusions_present: bool = False - exclusions_version: Optional[str] = None + exclusions_version: str | None = None exclusions_current: bool = False exclusions_included: bool = False crs_path_valid: bool = False exclusions_admin_ips: int = 0 exclusions_nodeping_ips: int = 0 - modsec_version: Optional[str] = None + modsec_version: str | None = None disabled_crs_rules: int = 0 audit_log_exists: bool = False recent_log_lines: list[str] = field(default_factory=list) @@ -66,7 +64,7 @@ def _pkg_installed(package: str) -> bool: return False -def _get_pkg_version(package: str) -> Optional[str]: +def _get_pkg_version(package: str) -> str | None: """Get the upstream version of an installed deb package.""" import subprocess @@ -96,7 +94,7 @@ def _is_ondrej_apache_ppa() -> bool: return any(glob.glob(p) for p in patterns) -def _read_file(path: str) -> Optional[str]: +def _read_file(path: str) -> str | None: try: return Path(path).read_text() except (OSError, PermissionError): @@ -114,7 +112,7 @@ def _tail_file(path: str, lines: int = 10) -> list[str]: return [] -def _parse_security2_crs_path(content: str) -> Optional[str]: +def _parse_security2_crs_path(content: str) -> str | None: """Extract the CRS path referenced in security2.conf.""" match = re.search(r"IncludeOptional\s+(\S+)/crs-setup\.conf", content) if match: @@ -122,7 +120,7 @@ def _parse_security2_crs_path(content: str) -> Optional[str]: return None -def _parse_exclusions_meta(content: str) -> tuple[Optional[str], Optional[str], int, int]: +def _parse_exclusions_meta(content: str) -> tuple[str | None, str | None, int, int]: """Parse exclusions file for version, hash, admin IP count, NodePing IP count.""" version = None template_hash = None @@ -171,8 +169,7 @@ def get_waf_status() -> WafStatus: rules_dir = Path(search_path) / "rules" if rules_dir.is_dir(): status.disabled_crs_rules = sum( - 1 for f in CRS_RULES_REQUIRE_296 - if (rules_dir / (f + ".disabled")).exists() + 1 for f in CRS_RULES_REQUIRE_296 if (rules_dir / (f + ".disabled")).exists() ) break @@ -203,9 +200,7 @@ def get_waf_status() -> WafStatus: if status.exclusions_present: excl_content = _read_file(NS_EXCLUSIONS_CONF) if excl_content: - version, deployed_hash, admin_count, np_count = _parse_exclusions_meta( - excl_content - ) + version, deployed_hash, admin_count, np_count = _parse_exclusions_meta(excl_content) status.exclusions_version = version # Use hash for drift detection — automatically catches any template change status.exclusions_current = deployed_hash == NS_EXCLUSIONS_HASH diff --git a/src/nssec/modules/waf/types.py b/src/nssec/modules/waf/types.py index 05a3739..be984fb 100644 --- a/src/nssec/modules/waf/types.py +++ b/src/nssec/modules/waf/types.py @@ -3,7 +3,6 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Optional @dataclass @@ -25,10 +24,10 @@ class PreflightResult: apache_running: bool = False modsec_installed: bool = False modsec_enabled: bool = False - modsec_mode: Optional[str] = None + modsec_mode: str | None = None crs_installed: bool = False - crs_version: Optional[str] = None - crs_path: Optional[str] = None + crs_version: str | None = None + crs_path: str | None = None security2_has_wildcard: bool = False security2_has_crs_load: bool = False errors: list[str] = field(default_factory=list) diff --git a/tests/conftest.py b/tests/conftest.py index 5c64131..43cd785 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,8 @@ """Shared pytest fixtures for nssec tests.""" +from unittest.mock import patch + import pytest -from unittest.mock import MagicMock, patch @pytest.fixture @@ -18,11 +19,11 @@ def mock_file_ops(): Patches at the point of use (nssec.modules.waf) not definition (utils). """ - with patch("nssec.modules.waf.file_exists") as exists, \ - patch("nssec.modules.waf.read_file") as read, \ - patch("nssec.modules.waf.write_file") as write, \ - patch("nssec.modules.waf.backup_file") as backup, \ - patch("nssec.modules.waf.render") as render_mock: + with patch("nssec.modules.waf.file_exists") as exists, patch( + "nssec.modules.waf.read_file" + ) as read, patch("nssec.modules.waf.write_file") as write, patch( + "nssec.modules.waf.backup_file" + ) as backup, patch("nssec.modules.waf.render") as render_mock: exists.return_value = True read.return_value = "" write.return_value = True @@ -55,4 +56,5 @@ def mock_preflight(): def cli_runner(): """Click CLI test runner.""" from click.testing import CliRunner + return CliRunner() diff --git a/tests/unit/test_waf_commands.py b/tests/unit/test_waf_commands.py index 28bb500..6565407 100644 --- a/tests/unit/test_waf_commands.py +++ b/tests/unit/test_waf_commands.py @@ -1,7 +1,8 @@ """Tests for WAF CLI commands.""" +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock from click.testing import CliRunner from nssec.cli.waf_commands import waf @@ -90,9 +91,9 @@ class TestWafRemove: def test_disables_security2_module(self, runner): """Should disable security2 Apache module.""" - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.modules.waf.utils.file_exists", return_value=True), \ - patch("nssec.modules.waf.utils.run_cmd") as mock_run: + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.modules.waf.utils.file_exists", return_value=True + ), patch("nssec.modules.waf.utils.run_cmd") as mock_run: mock_run.return_value = ("", "", 0) result = runner.invoke(waf, ["remove", "-y"]) @@ -104,8 +105,9 @@ def test_disables_security2_module(self, runner): def test_skips_if_already_disabled(self, runner): """Should skip if module already disabled.""" - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.modules.waf.utils.file_exists", return_value=False): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.modules.waf.utils.file_exists", return_value=False + ): result = runner.invoke(waf, ["remove", "-y"]) assert result.exit_code == 0 @@ -121,8 +123,9 @@ def test_requires_root(self, runner): def test_prompts_without_yes_flag(self, runner): """Should prompt for confirmation without -y flag.""" - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.modules.waf.utils.file_exists", return_value=True): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.modules.waf.utils.file_exists", return_value=True + ): result = runner.invoke(waf, ["remove"], input="n\n") assert "Aborted" in result.output @@ -133,8 +136,9 @@ class TestWafAllowlistAdd: def test_adds_ip_to_allowlist(self, runner, mock_installer): """Should add IP to allowlist.""" - with patch("nssec.modules.waf.get_allowlisted_ips", return_value=[]), \ - patch("nssec.modules.waf.add_allowlisted_ip") as mock_add: + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=[]), patch( + "nssec.modules.waf.add_allowlisted_ip" + ) as mock_add: mock_add.return_value = MagicMock(success=True, message="Added") result = runner.invoke(waf, ["allowlist", "add", "192.168.1.100", "-y"]) @@ -164,8 +168,9 @@ class TestWafAllowlistDelete: def test_removes_ip_from_allowlist(self, runner, mock_installer): """Should remove IP from allowlist.""" - with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100"]), \ - patch("nssec.modules.waf.remove_allowlisted_ip") as mock_remove: + with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100"]), patch( + "nssec.modules.waf.remove_allowlisted_ip" + ) as mock_remove: mock_remove.return_value = MagicMock(success=True, message="Removed") result = runner.invoke(waf, ["allowlist", "delete", "192.168.1.100", "-y"]) @@ -195,7 +200,9 @@ class TestWafAllowlistShow: def test_shows_allowlisted_ips(self, runner): """Should display allowlisted IPs.""" - with patch("nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100", "10.0.0.0/8"]): + with patch( + "nssec.modules.waf.get_allowlisted_ips", return_value=["192.168.1.100", "10.0.0.0/8"] + ): result = runner.invoke(waf, ["allowlist", "show"]) assert result.exit_code == 0 @@ -301,8 +308,9 @@ class TestWafEvasiveStatus: def test_shows_enabled_status(self, runner): """Should show enabled status when evasive is active.""" - with patch("nssec.modules.waf.utils.package_installed", return_value=True), \ - patch("nssec.modules.waf.utils.file_exists", return_value=True): + with patch("nssec.modules.waf.utils.package_installed", return_value=True), patch( + "nssec.modules.waf.utils.file_exists", return_value=True + ): result = runner.invoke(waf, ["evasive", "status"]) assert result.exit_code == 0 @@ -318,8 +326,9 @@ def test_shows_not_installed(self, runner): def test_default_subcommand_shows_status(self, runner): """Running 'waf evasive' without subcommand should show status.""" - with patch("nssec.modules.waf.utils.package_installed", return_value=True), \ - patch("nssec.modules.waf.utils.file_exists", return_value=True): + with patch("nssec.modules.waf.utils.package_installed", return_value=True), patch( + "nssec.modules.waf.utils.file_exists", return_value=True + ): result = runner.invoke(waf, ["evasive"]) assert result.exit_code == 0 @@ -434,8 +443,9 @@ def test_requires_root(self, runner, mock_installer): """Should fail if not root.""" mock_installer.preflight.return_value.is_root = False - with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.5"), \ - patch("nssec.modules.waf.utils.version_gte", return_value=False): + with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.5"), patch( + "nssec.modules.waf.utils.version_gte", return_value=False + ): result = runner.invoke(waf, ["update", "-y"]) assert result.exit_code == 1 @@ -443,8 +453,9 @@ def test_requires_root(self, runner, mock_installer): def test_shows_instructions_when_old(self, runner, mock_installer): """Should show Digitalwave repo instructions when ModSec < 2.9.6.""" - with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.5"), \ - patch("nssec.modules.waf.utils.version_gte", return_value=False): + with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.5"), patch( + "nssec.modules.waf.utils.version_gte", return_value=False + ): result = runner.invoke(waf, ["update", "-y"]) assert result.exit_code == 0 @@ -457,8 +468,9 @@ def test_nothing_to_do_when_current_no_disabled(self, runner, mock_installer): mock_installer._reenable_crs_rules.return_value = [] mock_installer.preflight.return_value.crs_path = "/etc/modsecurity/crs" - with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.7"), \ - patch("nssec.modules.waf.utils.version_gte", return_value=True): + with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.7"), patch( + "nssec.modules.waf.utils.version_gte", return_value=True + ): result = runner.invoke(waf, ["update", "-y"]) assert result.exit_code == 0 @@ -479,8 +491,9 @@ def test_reenables_rules_after_upgrade(self, runner, mock_installer): reload_result.message = "Apache reloaded" mock_installer.reload_apache.return_value = reload_result - with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.7"), \ - patch("nssec.modules.waf.utils.version_gte", return_value=True): + with patch("nssec.modules.waf.utils.detect_modsec_version", return_value="2.9.7"), patch( + "nssec.modules.waf.utils.version_gte", return_value=True + ): result = runner.invoke(waf, ["update", "-y"]) assert result.exit_code == 0 diff --git a/tests/unit/test_waf_module.py b/tests/unit/test_waf_module.py index 703b88c..4e1749a 100644 --- a/tests/unit/test_waf_module.py +++ b/tests/unit/test_waf_module.py @@ -1,8 +1,7 @@ """Tests for WAF module functions.""" -import pytest -from pathlib import Path -from unittest.mock import patch, MagicMock, call, ANY +from unittest.mock import MagicMock, patch + from jinja2 import Template @@ -304,8 +303,9 @@ def test_enables_evasive_module(self, mock_file_ops): """Should run a2enmod evasive when enabling.""" from nssec.modules.waf import ModSecurityInstaller - with patch("nssec.modules.waf.package_installed", return_value=True), \ - patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + with patch("nssec.modules.waf.package_installed", return_value=True), patch( + "nssec.modules.waf.run_cmd", return_value=("", "", 0) + ) as mock_run: mock_file_ops["exists"].return_value = False # not currently enabled installer = ModSecurityInstaller() result = installer.set_evasive_state(enable=True) @@ -318,8 +318,9 @@ def test_disables_evasive_module(self, mock_file_ops): """Should run a2dismod evasive when disabling.""" from nssec.modules.waf import ModSecurityInstaller - with patch("nssec.modules.waf.package_installed", return_value=True), \ - patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + with patch("nssec.modules.waf.package_installed", return_value=True), patch( + "nssec.modules.waf.run_cmd", return_value=("", "", 0) + ) as mock_run: mock_file_ops["exists"].return_value = True # currently enabled installer = ModSecurityInstaller() result = installer.set_evasive_state(enable=False) @@ -367,8 +368,9 @@ def test_dry_run_does_not_change_state(self, mock_file_ops): """Should not run commands in dry run mode.""" from nssec.modules.waf import ModSecurityInstaller - with patch("nssec.modules.waf.package_installed", return_value=True), \ - patch("nssec.modules.waf.run_cmd") as mock_run: + with patch("nssec.modules.waf.package_installed", return_value=True), patch( + "nssec.modules.waf.run_cmd" + ) as mock_run: mock_file_ops["exists"].return_value = False installer = ModSecurityInstaller(dry_run=True) result = installer.set_evasive_state(enable=True) @@ -381,8 +383,9 @@ def test_returns_error_on_command_failure(self, mock_file_ops): """Should return error if a2enmod/a2dismod fails.""" from nssec.modules.waf import ModSecurityInstaller - with patch("nssec.modules.waf.package_installed", return_value=True), \ - patch("nssec.modules.waf.run_cmd", return_value=("", "error", 1)): + with patch("nssec.modules.waf.package_installed", return_value=True), patch( + "nssec.modules.waf.run_cmd", return_value=("", "error", 1) + ): mock_file_ops["exists"].return_value = False installer = ModSecurityInstaller() result = installer.set_evasive_state(enable=True) @@ -399,8 +402,9 @@ def test_enable_mode_does_not_toggle_evasive(self, mock_file_ops): from nssec.modules.waf import ModSecurityInstaller mock_file_ops["read"].return_value = "SecRuleEngine DetectionOnly\n" - with patch("nssec.modules.waf.package_installed", return_value=True), \ - patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + with patch("nssec.modules.waf.package_installed", return_value=True), patch( + "nssec.modules.waf.run_cmd", return_value=("", "", 0) + ) as mock_run: mock_file_ops["exists"].return_value = False installer = ModSecurityInstaller() result = installer.set_mode("On") @@ -415,8 +419,9 @@ def test_detect_mode_does_not_toggle_evasive(self, mock_file_ops): from nssec.modules.waf import ModSecurityInstaller mock_file_ops["read"].return_value = "SecRuleEngine On\n" - with patch("nssec.modules.waf.package_installed", return_value=True), \ - patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)) as mock_run: + with patch("nssec.modules.waf.package_installed", return_value=True), patch( + "nssec.modules.waf.run_cmd", return_value=("", "", 0) + ) as mock_run: mock_file_ops["exists"].return_value = True installer = ModSecurityInstaller() result = installer.set_mode("DetectionOnly") @@ -471,8 +476,11 @@ def test_passes_nodeping_ips_to_template(self, mock_file_ops): assert result.success render_call = mock_file_ops["render"].call_args - assert render_call[1].get("nodeping_ips") == nodeping or \ - nodeping in render_call[0] if render_call[0] else False + assert ( + render_call[1].get("nodeping_ips") == nodeping or nodeping in render_call[0] + if render_call[0] + else False + ) def test_defaults_to_empty_nodeping_list(self, mock_file_ops): """Should default to empty list when no nodeping_ips provided.""" @@ -484,8 +492,9 @@ def test_defaults_to_empty_nodeping_list(self, mock_file_ops): assert result.success render_call = mock_file_ops["render"].call_args # nodeping_ips should be an empty list - assert render_call[1].get("nodeping_ips") == [] or \ - render_call.kwargs.get("nodeping_ips") == [] + assert ( + render_call[1].get("nodeping_ips") == [] or render_call.kwargs.get("nodeping_ips") == [] + ) def test_passes_both_admin_and_nodeping_ips(self, mock_file_ops): """Should pass both admin_ips and nodeping_ips to template.""" @@ -554,7 +563,7 @@ def test_nodeping_rules_bypass_crs(self): ) # Find the NodePing rule section lines = rendered.split("\n") - nodeping_rule_lines = [l for l in lines if "1000201" in l] + nodeping_rule_lines = [line for line in lines if "1000201" in line] assert len(nodeping_rule_lines) > 0 assert "ruleRemoveByTag=OWASP_CRS" in rendered @@ -574,8 +583,9 @@ def test_updates_crs_setup_when_v4_present(self, mock_file_ops): pf.can_proceed = True installer._preflight = pf - with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.7"), \ - patch("nssec.modules.waf.version_gte", return_value=True): + with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.7"), patch( + "nssec.modules.waf.version_gte", return_value=True + ): result = installer.install_crs_v4() assert result.skipped @@ -597,8 +607,9 @@ def test_skips_setup_update_renders_template(self, mock_file_ops): pf.can_proceed = True installer._preflight = pf - with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.7"), \ - patch("nssec.modules.waf.version_gte", return_value=True): + with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.7"), patch( + "nssec.modules.waf.version_gte", return_value=True + ): installer.install_crs_v4() # render should have been called for crs-setup.conf @@ -612,8 +623,9 @@ def test_clears_preflight_after_download(self, mock_file_ops): """Should clear _preflight cache after successful CRS download.""" from nssec.modules.waf import ModSecurityInstaller - with patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)), \ - patch("nssec.modules.waf.Path"): + with patch("nssec.modules.waf.run_cmd", return_value=("", "", 0)), patch( + "nssec.modules.waf.Path" + ): installer = ModSecurityInstaller() pf = MagicMock() pf.crs_installed = False @@ -654,8 +666,9 @@ def test_disables_rules_on_old_modsec(self, tmp_path, mock_file_ops): rule_file = rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf" rule_file.write_text("# rule content") - with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), \ - patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: False): + with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), patch( + "nssec.modules.waf.version_gte", side_effect=lambda v, t: False + ): installer = ModSecurityInstaller() disabled = installer._disable_incompatible_crs_rules(str(tmp_path)) @@ -672,8 +685,9 @@ def test_skips_on_new_modsec(self, tmp_path, mock_file_ops): rule_file = rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf" rule_file.write_text("# rule content") - with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.6"), \ - patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: True): + with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.6"), patch( + "nssec.modules.waf.version_gte", side_effect=lambda v, t: True + ): installer = ModSecurityInstaller() disabled = installer._disable_incompatible_crs_rules(str(tmp_path)) @@ -690,8 +704,9 @@ def test_skips_already_disabled(self, tmp_path, mock_file_ops): (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf").write_text("# rule") (rules_dir / "REQUEST-922-MULTIPART-ATTACK.conf.disabled").write_text("# disabled") - with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), \ - patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: False): + with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), patch( + "nssec.modules.waf.version_gte", side_effect=lambda v, t: False + ): installer = ModSecurityInstaller() disabled = installer._disable_incompatible_crs_rules(str(tmp_path)) @@ -705,8 +720,9 @@ def test_handles_missing_rule_file(self, tmp_path, mock_file_ops): rules_dir.mkdir() # No rule files created - with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), \ - patch("nssec.modules.waf.version_gte", side_effect=lambda v, t: False): + with patch("nssec.modules.waf.detect_modsec_version", return_value="2.9.5"), patch( + "nssec.modules.waf.version_gte", side_effect=lambda v, t: False + ): installer = ModSecurityInstaller() disabled = installer._disable_incompatible_crs_rules(str(tmp_path)) @@ -758,5 +774,3 @@ def test_returns_empty_when_nothing_disabled(self, tmp_path, mock_file_ops): reenabled = installer._reenable_crs_rules(str(tmp_path)) assert reenabled == [] - - diff --git a/tests/unit/test_waf_restrict.py b/tests/unit/test_waf_restrict.py index 37bfbb7..bdd491a 100644 --- a/tests/unit/test_waf_restrict.py +++ b/tests/unit/test_waf_restrict.py @@ -1,8 +1,8 @@ """Tests for WAF restrict module functions.""" import json -import pytest -from unittest.mock import patch, MagicMock +from unittest.mock import patch + from jinja2 import Template @@ -44,7 +44,6 @@ class TestSaveCachedIps: def test_saves_ips_to_json(self): """Should write IPs as JSON to cache file.""" from nssec.modules.waf.restrict import save_cached_ips - from nssec.modules.waf.config import RESTRICT_CACHE_PATH with patch("nssec.modules.waf.restrict.write_file", return_value=True) as mock_write: result = save_cached_ips(["127.0.0.1", "10.0.0.1"]) @@ -219,8 +218,8 @@ class TestIsNssecManaged: def test_returns_true_for_managed_file(self): """Should return True when marker is present.""" - from nssec.modules.waf.restrict import is_nssec_managed from nssec.modules.waf.config import RESTRICT_MANAGED_MARKER + from nssec.modules.waf.restrict import is_nssec_managed content = f"{RESTRICT_MANAGED_MARKER}\n\n\n" with patch("nssec.modules.waf.restrict.read_file", return_value=content): @@ -252,7 +251,9 @@ class TestInitRestrictions: @patch("nssec.modules.waf.restrict.file_exists", return_value=False) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_creates_htaccess_files(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect): + def test_creates_htaccess_files( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect + ): """Should create .htaccess files for applicable targets.""" from nssec.modules.waf.restrict import init_restrictions @@ -270,7 +271,9 @@ def test_creates_htaccess_files(self, mock_render, mock_isdir, mock_exists, mock @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_overwrites_unmanaged_files(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect): + def test_overwrites_unmanaged_files( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect + ): """Should overwrite existing unmanaged .htaccess files.""" from nssec.modules.waf.restrict import init_restrictions @@ -299,8 +302,9 @@ def test_no_targets_returns_skip(self): """Should return skip result when no targets apply.""" from nssec.modules.waf.restrict import init_restrictions - with patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]), \ - patch("nssec.modules.waf.restrict.is_directory", return_value=False): + with patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]), patch( + "nssec.modules.waf.restrict.is_directory", return_value=False + ): results = init_restrictions("core", ["192.168.1.100"]) assert len(results) == 1 @@ -313,7 +317,9 @@ def test_no_targets_returns_skip(self): @patch("nssec.modules.waf.restrict.file_exists", return_value=False) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_always_includes_localhost(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect): + def test_always_includes_localhost( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect + ): """Should always include 127.0.0.1 in rendered IPs.""" from nssec.modules.waf.restrict import init_restrictions @@ -331,7 +337,9 @@ def test_always_includes_localhost(self, mock_render, mock_isdir, mock_exists, m @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_merges_existing_ips_from_all_targets(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect): + def test_merges_existing_ips_from_all_targets( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect + ): """Should merge existing IPs from all targets into every file.""" from nssec.modules.waf.restrict import init_restrictions @@ -355,7 +363,9 @@ def test_merges_existing_ips_from_all_targets(self, mock_render, mock_isdir, moc @patch("nssec.modules.waf.restrict.file_exists", return_value=False) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_merges_ips_from_cache(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect): + def test_merges_ips_from_cache( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save, mock_collect + ): """Should merge IPs from cache file into new .htaccess files.""" from nssec.modules.waf.restrict import init_restrictions @@ -376,7 +386,9 @@ def test_merges_ips_from_cache(self, mock_render, mock_isdir, mock_exists, mock_ @patch("nssec.modules.waf.restrict.file_exists", return_value=False) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_saves_ips_to_cache_after_init(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_collect): + def test_saves_ips_to_cache_after_init( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_collect + ): """Should save IPs to cache file after successful init.""" from nssec.modules.waf.restrict import init_restrictions @@ -399,8 +411,9 @@ def test_collects_ips_from_existing_htaccess(self, mock_isdir, mock_cache): from nssec.modules.waf.restrict import collect_existing_ips content = "Require ip 127.0.0.1\nRequire ip 10.0.0.5\n" - with patch("nssec.modules.waf.restrict.file_exists", return_value=True), \ - patch("nssec.modules.waf.restrict.read_file", return_value=content): + with patch("nssec.modules.waf.restrict.file_exists", return_value=True), patch( + "nssec.modules.waf.restrict.read_file", return_value=content + ): ips = collect_existing_ips("core") assert "10.0.0.5" in ips @@ -411,8 +424,9 @@ def test_collects_ips_from_cache(self, mock_isdir): """Should include IPs from the cache file.""" from nssec.modules.waf.restrict import collect_existing_ips - with patch("nssec.modules.waf.restrict.file_exists", return_value=False), \ - patch("nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "172.16.0.1"]): + with patch("nssec.modules.waf.restrict.file_exists", return_value=False), patch( + "nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "172.16.0.1"] + ): ips = collect_existing_ips("core") assert "172.16.0.1" in ips @@ -435,8 +449,9 @@ def test_collects_legacy_allow_from_ips(self, mock_isdir, mock_cache): from nssec.modules.waf.restrict import collect_existing_ips content = "Order deny,allow\nDeny from all\nAllow from 10.0.0.5\n" - with patch("nssec.modules.waf.restrict.file_exists", return_value=True), \ - patch("nssec.modules.waf.restrict.read_file", return_value=content): + with patch("nssec.modules.waf.restrict.file_exists", return_value=True), patch( + "nssec.modules.waf.restrict.read_file", return_value=content + ): ips = collect_existing_ips("core") assert "10.0.0.5" in ips @@ -451,12 +466,13 @@ class TestInitRestrictionsNoMerge: @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_does_not_merge_existing_when_disabled(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save): + def test_does_not_merge_existing_when_disabled( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save + ): """Should not merge existing IPs when merge_existing=False.""" from nssec.modules.waf.restrict import init_restrictions - results = init_restrictions("core", ["192.168.1.100"], - merge_existing=False) + results = init_restrictions("core", ["192.168.1.100"], merge_existing=False) for name, result in results: assert result.success @@ -474,12 +490,13 @@ def test_does_not_merge_existing_when_disabled(self, mock_render, mock_isdir, mo @patch("nssec.modules.waf.restrict.file_exists", return_value=False) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_does_not_merge_cache_when_disabled(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save): + def test_does_not_merge_cache_when_disabled( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write, mock_save + ): """Should not merge cache IPs when merge_existing=False.""" from nssec.modules.waf.restrict import init_restrictions - results = init_restrictions("core", ["192.168.1.100"], - merge_existing=False) + init_restrictions("core", ["192.168.1.100"], merge_existing=False) # Render should only include provided IPs + localhost for call_args in mock_render.call_args_list: @@ -500,7 +517,18 @@ class TestAddRestrictedIp: @patch("nssec.modules.waf.restrict.parse_htaccess_ips", return_value=["127.0.0.1"]) @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) - def test_adds_ip_to_managed_files(self, mock_isdir, mock_exists, mock_parse, mock_managed, mock_render, mock_backup, mock_write, mock_load, mock_save): + def test_adds_ip_to_managed_files( + self, + mock_isdir, + mock_exists, + mock_parse, + mock_managed, + mock_render, + mock_backup, + mock_write, + mock_load, + mock_save, + ): """Should add IP to all managed .htaccess files and update cache.""" from nssec.modules.waf.restrict import add_restricted_ip @@ -516,7 +544,9 @@ def test_adds_ip_to_managed_files(self, mock_isdir, mock_exists, mock_parse, moc assert "192.168.1.100" in saved_ips @patch("nssec.modules.waf.restrict.is_nssec_managed", return_value=True) - @patch("nssec.modules.waf.restrict.parse_htaccess_ips", return_value=["127.0.0.1", "192.168.1.100"]) + @patch( + "nssec.modules.waf.restrict.parse_htaccess_ips", return_value=["127.0.0.1", "192.168.1.100"] + ) @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) def test_skips_duplicate_ip(self, mock_isdir, mock_exists, mock_parse, mock_managed): @@ -569,15 +599,30 @@ def test_refuses_to_remove_localhost(self): assert "Cannot remove 127.0.0.1" in results[0][1].error @patch("nssec.modules.waf.restrict.save_cached_ips") - @patch("nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "192.168.1.100"]) + @patch( + "nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "192.168.1.100"] + ) @patch("nssec.modules.waf.restrict.write_file", return_value=True) @patch("nssec.modules.waf.restrict.backup_file") @patch("nssec.modules.waf.restrict.render", return_value="rendered") @patch("nssec.modules.waf.restrict.is_nssec_managed", return_value=True) - @patch("nssec.modules.waf.restrict.parse_htaccess_ips", return_value=["127.0.0.1", "192.168.1.100"]) + @patch( + "nssec.modules.waf.restrict.parse_htaccess_ips", return_value=["127.0.0.1", "192.168.1.100"] + ) @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) - def test_removes_user_added_ip(self, mock_isdir, mock_exists, mock_parse, mock_managed, mock_render, mock_backup, mock_write, mock_load, mock_save): + def test_removes_user_added_ip( + self, + mock_isdir, + mock_exists, + mock_parse, + mock_managed, + mock_render, + mock_backup, + mock_write, + mock_load, + mock_save, + ): """Should remove a user-added IP from managed files and update cache.""" from nssec.modules.waf.restrict import remove_restricted_ip @@ -695,7 +740,9 @@ def test_returns_skip_when_no_cache(self): @patch("nssec.modules.waf.restrict.file_exists", return_value=False) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_restores_from_cache(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write): + def test_restores_from_cache( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write + ): """Should re-create .htaccess files from cached IPs.""" from nssec.modules.waf.restrict import reapply_restrictions @@ -719,9 +766,9 @@ def test_dry_run_does_not_write(self, mock_isdir): from nssec.modules.waf.restrict import reapply_restrictions cached = json.dumps({"ips": ["127.0.0.1", "10.0.0.1"]}) - with patch("nssec.modules.waf.restrict.read_file", return_value=cached), \ - patch("nssec.modules.waf.restrict.file_exists", return_value=False), \ - patch("nssec.modules.waf.restrict.write_file") as mock_write: + with patch("nssec.modules.waf.restrict.read_file", return_value=cached), patch( + "nssec.modules.waf.restrict.file_exists", return_value=False + ), patch("nssec.modules.waf.restrict.write_file") as mock_write: results = reapply_restrictions("core", dry_run=True) mock_write.assert_not_called() @@ -734,7 +781,9 @@ def test_dry_run_does_not_write(self, mock_isdir): @patch("nssec.modules.waf.restrict.file_exists", return_value=True) @patch("nssec.modules.waf.restrict.is_directory", return_value=True) @patch("nssec.modules.waf.restrict.render", return_value="rendered") - def test_backs_up_existing_before_overwrite(self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write): + def test_backs_up_existing_before_overwrite( + self, mock_render, mock_isdir, mock_exists, mock_backup, mock_write + ): """Should backup existing files before restoring from cache.""" from nssec.modules.waf.restrict import reapply_restrictions diff --git a/tests/unit/test_waf_restrict_commands.py b/tests/unit/test_waf_restrict_commands.py index facde43..1a4d6cb 100644 --- a/tests/unit/test_waf_restrict_commands.py +++ b/tests/unit/test_waf_restrict_commands.py @@ -1,7 +1,8 @@ """Tests for WAF restrict CLI commands.""" +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock from click.testing import CliRunner from nssec.cli.waf_commands import waf @@ -34,8 +35,9 @@ def test_shows_status_table(self, runner): "ips": [], }, ] - with patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.get_restrict_status", return_value=statuses): + with patch("nssec.core.server_types.detect_server_type") as mock_detect, patch( + "nssec.modules.waf.restrict.get_restrict_status", return_value=statuses + ): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "show"]) @@ -45,8 +47,9 @@ def test_shows_status_table(self, runner): def test_shows_empty_message(self, runner): """Should show message when no targets apply.""" - with patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.get_restrict_status", return_value=[]): + with patch("nssec.core.server_types.detect_server_type") as mock_detect, patch( + "nssec.modules.waf.restrict.get_restrict_status", return_value=[] + ): mock_detect.return_value = MagicMock(value="unknown") result = runner.invoke(waf, ["restrict", "show"]) @@ -64,8 +67,9 @@ def test_default_subcommand_shows_status(self, runner): "ips": ["127.0.0.1"], }, ] - with patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.get_restrict_status", return_value=statuses): + with patch("nssec.core.server_types.detect_server_type") as mock_detect, patch( + "nssec.modules.waf.restrict.get_restrict_status", return_value=statuses + ): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict"]) @@ -83,8 +87,9 @@ def test_lists_ips_from_first_managed_file(self, runner): "ips": ["127.0.0.1", "10.0.0.1"], }, ] - with patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.get_restrict_status", return_value=statuses): + with patch("nssec.core.server_types.detect_server_type") as mock_detect, patch( + "nssec.modules.waf.restrict.get_restrict_status", return_value=statuses + ): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "show"]) @@ -113,11 +118,13 @@ def test_creates_htaccess_files(self, runner): ("ns-api", StepResult(message="Created file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]), \ - patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results) as mock_init, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.collect_existing_ips", return_value=[] + ), patch( + "nssec.modules.waf.restrict.init_restrictions", return_value=mock_results + ) as mock_init, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "init", "--ip", "192.168.1.100", "-y"]) @@ -128,9 +135,9 @@ def test_creates_htaccess_files(self, runner): def test_validates_ip_address(self, runner): """Should reject invalid IP addresses.""" - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "init", "--ip", "not-an-ip", "-y"]) @@ -145,10 +152,11 @@ def test_dry_run(self, runner): ("SiPbx Admin UI", StepResult(message="Would create file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]), \ - patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.collect_existing_ips", return_value=[] + ), patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "init", "--ip", "10.0.0.1", "--dry-run"]) @@ -163,11 +171,13 @@ def test_accepts_cidr_notation(self, runner): ("SiPbx Admin UI", StepResult(message="Created file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=[]), \ - patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results), \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.collect_existing_ips", return_value=[] + ), patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results), patch( + "nssec.modules.waf.utils.run_cmd", return_value=("", "", 0) + ): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "init", "--ip", "10.0.0.0/8", "-y"]) @@ -181,15 +191,19 @@ def test_shows_existing_ips_and_keeps_by_default(self, runner): ("SiPbx Admin UI", StepResult(message="Created file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=["10.0.0.5", "172.16.0.1"]), \ - patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results) as mock_init, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.collect_existing_ips", + return_value=["10.0.0.5", "172.16.0.1"], + ), patch( + "nssec.modules.waf.restrict.init_restrictions", return_value=mock_results + ) as mock_init, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") # Confirm keep=Yes, create=Yes, reload=Yes - result = runner.invoke(waf, ["restrict", "init", "--ip", "192.168.1.100"], - input="y\ny\ny\n") + result = runner.invoke( + waf, ["restrict", "init", "--ip", "192.168.1.100"], input="y\ny\ny\n" + ) assert result.exit_code == 0 assert "10.0.0.5" in result.output @@ -206,15 +220,18 @@ def test_shows_existing_ips_and_overwrites_on_no(self, runner): ("SiPbx Admin UI", StepResult(message="Created file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=["10.0.0.5"]), \ - patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results) as mock_init, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.collect_existing_ips", return_value=["10.0.0.5"] + ), patch( + "nssec.modules.waf.restrict.init_restrictions", return_value=mock_results + ) as mock_init, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") # Confirm keep=No, create=Yes, reload=Yes - result = runner.invoke(waf, ["restrict", "init", "--ip", "192.168.1.100"], - input="n\ny\ny\n") + result = runner.invoke( + waf, ["restrict", "init", "--ip", "192.168.1.100"], input="n\ny\ny\n" + ) assert result.exit_code == 0 assert "Overwriting" in result.output @@ -230,11 +247,13 @@ def test_yes_flag_keeps_existing_ips_by_default(self, runner): ("SiPbx Admin UI", StepResult(message="Created file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.collect_existing_ips", return_value=["10.0.0.5"]), \ - patch("nssec.modules.waf.restrict.init_restrictions", return_value=mock_results) as mock_init, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.collect_existing_ips", return_value=["10.0.0.5"] + ), patch( + "nssec.modules.waf.restrict.init_restrictions", return_value=mock_results + ) as mock_init, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "init", "--ip", "192.168.1.100", "-y"]) @@ -263,10 +282,11 @@ def test_adds_ip_to_managed_files(self, runner): ("SiPbx Admin UI", StepResult(message="Added 192.168.1.100")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.add_restricted_ip", return_value=mock_results) as mock_add, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.add_restricted_ip", return_value=mock_results + ) as mock_add, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "add", "192.168.1.100", "-y"]) @@ -275,8 +295,9 @@ def test_adds_ip_to_managed_files(self, runner): def test_validates_ip_address(self, runner): """Should reject invalid IP addresses.""" - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect: + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect: mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "add", "not-valid", "-y"]) @@ -303,10 +324,11 @@ def test_removes_ip_from_managed_files(self, runner): ("SiPbx Admin UI", StepResult(message="Removed 192.168.1.100")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.remove_restricted_ip", return_value=mock_results) as mock_remove, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.remove_restricted_ip", return_value=mock_results + ) as mock_remove, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "remove", "192.168.1.100", "-y"]) @@ -318,12 +340,20 @@ def test_blocks_localhost_removal(self, runner): from nssec.modules.waf.types import StepResult mock_results = [ - ("", StepResult(success=False, error="Cannot remove 127.0.0.1 (localhost must always be allowed)")), + ( + "", + StepResult( + success=False, + error="Cannot remove 127.0.0.1 (localhost must always be allowed)", + ), + ), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.remove_restricted_ip", return_value=mock_results): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.remove_restricted_ip", return_value=mock_results + ): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "remove", "127.0.0.1", "-y"]) @@ -350,11 +380,13 @@ def test_restores_from_cache(self, runner): ("SiPbx Admin UI", StepResult(message="Restored file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "10.0.0.1"]), \ - patch("nssec.modules.waf.restrict.reapply_restrictions", return_value=mock_results) as mock_reapply, \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "10.0.0.1"] + ), patch( + "nssec.modules.waf.restrict.reapply_restrictions", return_value=mock_results + ) as mock_reapply, patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "reapply", "-y"]) @@ -369,11 +401,13 @@ def test_shows_cached_ips(self, runner): ("SiPbx Admin UI", StepResult(message="Restored file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "10.0.0.1"]), \ - patch("nssec.modules.waf.restrict.reapply_restrictions", return_value=mock_results), \ - patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1", "10.0.0.1"] + ), patch( + "nssec.modules.waf.restrict.reapply_restrictions", return_value=mock_results + ), patch("nssec.modules.waf.utils.run_cmd", return_value=("", "", 0)): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "reapply", "-y"]) @@ -387,10 +421,11 @@ def test_dry_run(self, runner): ("SiPbx Admin UI", StepResult(message="Would write file")), ] - with patch("nssec.core.ssh.is_root", return_value=True), \ - patch("nssec.core.server_types.detect_server_type") as mock_detect, \ - patch("nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1"]), \ - patch("nssec.modules.waf.restrict.reapply_restrictions", return_value=mock_results): + with patch("nssec.core.ssh.is_root", return_value=True), patch( + "nssec.core.server_types.detect_server_type" + ) as mock_detect, patch( + "nssec.modules.waf.restrict.load_cached_ips", return_value=["127.0.0.1"] + ), patch("nssec.modules.waf.restrict.reapply_restrictions", return_value=mock_results): mock_detect.return_value = MagicMock(value="core") result = runner.invoke(waf, ["restrict", "reapply", "--dry-run"]) diff --git a/tests/unit/test_waf_status.py b/tests/unit/test_waf_status.py index e04bb15..d880e59 100644 --- a/tests/unit/test_waf_status.py +++ b/tests/unit/test_waf_status.py @@ -1,7 +1,5 @@ """Tests for WAF status reporting.""" -import pytest -from unittest.mock import patch, MagicMock from jinja2 import Template diff --git a/tests/unit/test_waf_utils.py b/tests/unit/test_waf_utils.py index 2f80b19..efdb9fc 100644 --- a/tests/unit/test_waf_utils.py +++ b/tests/unit/test_waf_utils.py @@ -2,8 +2,6 @@ from unittest.mock import patch -import pytest - class TestVersionGte: """Tests for version_gte helper.""" @@ -158,9 +156,9 @@ def capture_write(path, content): written["content"] = content return True - with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \ - patch("nssec.modules.waf.utils.backup_file"), \ - patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): + with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), patch( + "nssec.modules.waf.utils.backup_file" + ), patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): result = append_crs_to_security2("/etc/modsecurity/crs") assert result is True @@ -179,9 +177,9 @@ def capture_write(path, content): written["content"] = content return True - with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \ - patch("nssec.modules.waf.utils.backup_file"), \ - patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): + with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), patch( + "nssec.modules.waf.utils.backup_file" + ), patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): append_crs_to_security2("/etc/modsecurity/crs") content = written["content"] @@ -205,18 +203,19 @@ def capture_write(path, content): written["content"] = content return True - with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \ - patch("nssec.modules.waf.utils.backup_file"), \ - patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): + with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), patch( + "nssec.modules.waf.utils.backup_file" + ), patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): append_crs_to_security2("/etc/modsecurity/crs") content = written["content"] # The modsecurity/*.conf line should remain active (not commented) active_lines = [ - l.strip() for l in content.splitlines() - if not l.strip().startswith("#") and l.strip() + line.strip() + for line in content.splitlines() + if not line.strip().startswith("#") and line.strip() ] - assert any("/etc/modsecurity/*.conf" in l for l in active_lines) + assert any("/etc/modsecurity/*.conf" in line for line in active_lines) def test_does_not_comment_out_already_commented_lines(self): """Should not double-comment already commented CRS lines.""" @@ -235,9 +234,9 @@ def capture_write(path, content): written["content"] = content return True - with patch("nssec.modules.waf.utils.read_file", return_value=content_with_comment), \ - patch("nssec.modules.waf.utils.backup_file"), \ - patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): + with patch("nssec.modules.waf.utils.read_file", return_value=content_with_comment), patch( + "nssec.modules.waf.utils.backup_file" + ), patch("nssec.modules.waf.utils.write_file", side_effect=capture_write): append_crs_to_security2("/etc/modsecurity/crs") content = written["content"] @@ -247,9 +246,9 @@ def capture_write(path, content): def test_returns_false_on_write_failure(self): from nssec.modules.waf.utils import append_crs_to_security2 - with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), \ - patch("nssec.modules.waf.utils.backup_file"), \ - patch("nssec.modules.waf.utils.write_file", return_value=False): + with patch("nssec.modules.waf.utils.read_file", return_value=DEBIAN_DEFAULT_SEC2), patch( + "nssec.modules.waf.utils.backup_file" + ), patch("nssec.modules.waf.utils.write_file", return_value=False): result = append_crs_to_security2("/etc/modsecurity/crs") assert result is False