From bbef2263bab6ca1407fc3226590cf888512e6bb7 Mon Sep 17 00:00:00 2001 From: Dennis Braun Date: Fri, 24 Apr 2026 21:50:48 +0200 Subject: [PATCH] feat: add segment saturation events --- app/blueprints/segment_bp.py | 75 +++++- app/i18n/de.json | 19 ++ app/i18n/en.json | 19 ++ app/i18n/es.json | 19 ++ app/i18n/fr.json | 19 ++ app/i18n/template.json | 19 ++ app/static/css/segment-utilization.css | 139 ++++++++++ app/static/js/segment-utilization.js | 210 +++++++++++++++ app/storage/segment_utilization.py | 216 ++++++++++++++++ app/templates/segment_utilization_tab.html | 10 + tests/modules/test_fritzbox_cable_routes.py | 181 ++++++++++++- tests/modules/test_fritzbox_cable_storage.py | 254 ++++++++++++++++++- 12 files changed, 1174 insertions(+), 6 deletions(-) diff --git a/app/blueprints/segment_bp.py b/app/blueprints/segment_bp.py index 6e20a371..c23b7adb 100644 --- a/app/blueprints/segment_bp.py +++ b/app/blueprints/segment_bp.py @@ -33,6 +33,13 @@ def _get_storage(): RANGE_HOURS = {"24h": 24, "7d": 168, "30d": 720, "all": 0} +def _normalize_range_key(raw, default): + """Return ``raw`` if it's a recognized range key, otherwise ``default``.""" + if raw in RANGE_HOURS: + return raw + return default + + @segment_bp.route("/api/fritzbox/segment-utilization") @require_auth def api_segment_utilization(): @@ -50,8 +57,8 @@ def api_segment_utilization(): if not storage: return jsonify({"error": "Storage unavailable"}), 503 - range_key = request.args.get("range", "24h") - hours = RANGE_HOURS.get(range_key, 24) + range_key = _normalize_range_key(request.args.get("range"), "24h") + hours = RANGE_HOURS[range_key] if hours > 0: start = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ") @@ -66,10 +73,72 @@ def api_segment_utilization(): }) +def _clamp_int(value, default, lo, hi): + """Parse an int-ish query param and clamp it to [lo, hi]. Falls back + to ``default`` on parse errors or None.""" + if value is None or value == "": + return default + try: + parsed = int(value) + except (TypeError, ValueError): + return default + if parsed < lo: + return lo + if parsed > hi: + return hi + return parsed + + +@segment_bp.route("/api/fritzbox/segment-utilization/events") +@require_auth +def api_segment_utilization_events(): + """Return detected segment saturation events for the requested range.""" + config = get_config_manager() + t = get_translations(_get_lang()) + if not config: + return jsonify({"error": t.get("seg_unavailable", "Configuration unavailable.")}), 503 + if config.get("modem_type") != "fritzbox": + return jsonify({"error": t.get("seg_unsupported_driver", "This view is only available for FRITZ!Box cable devices.")}), 400 + if not config.is_segment_utilization_enabled(): + return jsonify({"error": t.get("seg_disabled", "Segment utilization is disabled in Settings.")}), 400 + + storage = _get_storage() + if not storage: + return jsonify({"error": "Storage unavailable"}), 503 + + threshold = _clamp_int(request.args.get("threshold"), default=80, lo=1, hi=100) + min_minutes = _clamp_int(request.args.get("min_minutes"), default=3, lo=1, hi=1440) + + range_key = _normalize_range_key(request.args.get("range"), "7d") + hours = RANGE_HOURS[range_key] + if hours > 0: + start = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%SZ") + else: + start = "2000-01-01T00:00:00Z" + end = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + events = storage.get_events(start, end, threshold=threshold, min_minutes=min_minutes) + return jsonify({ + "events": events, + "threshold": threshold, + "min_minutes": min_minutes, + "range": range_key, + }) + + @segment_bp.route("/api/fritzbox/segment-utilization/range") @require_auth def api_segment_utilization_range(): - """Return segment data for a time range (used by correlation graph).""" + """Return segment data for a time range (used by correlation graph). + + Returns an empty list — rather than an error — when configuration is + unavailable, the driver is unsupported, or the feature is disabled. + This matches ``/api/weather/range`` so the correlation view degrades + gracefully when optional data sources are absent. + """ + config = get_config_manager() + if not config or config.get("modem_type") != "fritzbox" or not config.is_segment_utilization_enabled(): + return jsonify([]) storage = _get_storage() if not storage: return jsonify([]) diff --git a/app/i18n/de.json b/app/i18n/de.json index 48d8898e..608c0373 100644 --- a/app/i18n/de.json +++ b/app/i18n/de.json @@ -795,6 +795,25 @@ "seg_unavailable": "Konfiguration nicht verfügbar.", "seg_correlation_ds": "Segment DS Last (%)", "seg_correlation_us": "Segment US Last (%)", + "seg_events_title": "Segment-Auslastungsereignisse", + "seg_events_subtitle": "Zeiträume, in denen Downstream- oder Upstream-Last auf oder über dem Schwellwert lag.", + "seg_events_meta": "Schwellwert {th}% für {min}+ Min", + "seg_events_loading": "Auslastungsereignisse werden geladen...", + "seg_events_empty": "Keine Auslastungsereignisse in diesem Zeitraum gefunden.", + "seg_events_error": "Auslastungsereignisse konnten nicht geladen werden.", + "seg_events_direction_ds": "Downstream", + "seg_events_direction_us": "Upstream", + "seg_events_peak_total": "Peak Gesamt", + "seg_events_peak_own": "Peak Eigen", + "seg_events_peak_neighbor": "Peak Nachbarlast", + "seg_events_confidence": "Konfidenz", + "seg_events_confidence_high": "hoch", + "seg_events_confidence_medium": "mittel", + "seg_events_confidence_low": "niedrig", + "seg_events_correlate": "In Korrelation öffnen", + "seg_events_correlate_out_of_range": "Die Korrelationsansicht unterstützt bis zu 7 Tage.", + "seg_events_min_short": "Min", + "seg_events_h_short": "Std", "sidebar_analysis": "Analyse", "smart_capture": "Smart Capture", "sc_enable": "Smart Capture aktivieren", diff --git a/app/i18n/en.json b/app/i18n/en.json index 67d03a4b..e7e0e131 100644 --- a/app/i18n/en.json +++ b/app/i18n/en.json @@ -795,6 +795,25 @@ "seg_unavailable": "Configuration unavailable.", "seg_correlation_ds": "Segment DS Load (%)", "seg_correlation_us": "Segment US Load (%)", + "seg_events_title": "Segment Saturation Events", + "seg_events_subtitle": "Periods where downstream or upstream load stayed at or above the threshold.", + "seg_events_meta": "Threshold {th}% for {min}+ min", + "seg_events_loading": "Loading saturation events...", + "seg_events_empty": "No saturation events detected in this range.", + "seg_events_error": "Could not load saturation events.", + "seg_events_direction_ds": "Downstream", + "seg_events_direction_us": "Upstream", + "seg_events_peak_total": "Peak total", + "seg_events_peak_own": "Peak own", + "seg_events_peak_neighbor": "Peak neighbor load", + "seg_events_confidence": "Confidence", + "seg_events_confidence_high": "high", + "seg_events_confidence_medium": "medium", + "seg_events_confidence_low": "low", + "seg_events_correlate": "Open in correlation", + "seg_events_correlate_out_of_range": "Correlation view supports up to 7 days.", + "seg_events_min_short": "min", + "seg_events_h_short": "h", "sidebar_analysis": "Analysis", "smart_capture": "Smart Capture", "sc_enable": "Enable Smart Capture", diff --git a/app/i18n/es.json b/app/i18n/es.json index 45e175d2..476d8881 100644 --- a/app/i18n/es.json +++ b/app/i18n/es.json @@ -790,6 +790,25 @@ "seg_unavailable": "Configuracion no disponible.", "seg_correlation_ds": "Carga segmento DS (%)", "seg_correlation_us": "Carga segmento US (%)", + "seg_events_title": "Eventos de saturacion del segmento", + "seg_events_subtitle": "Periodos en los que la carga downstream o upstream se mantuvo en el umbral o por encima.", + "seg_events_meta": "Umbral {th}% durante {min}+ min", + "seg_events_loading": "Cargando eventos de saturacion...", + "seg_events_empty": "No se detectaron eventos de saturacion en este rango.", + "seg_events_error": "No se pudieron cargar los eventos de saturacion.", + "seg_events_direction_ds": "Downstream", + "seg_events_direction_us": "Upstream", + "seg_events_peak_total": "Pico total", + "seg_events_peak_own": "Pico propio", + "seg_events_peak_neighbor": "Pico carga vecina", + "seg_events_confidence": "Confianza", + "seg_events_confidence_high": "alta", + "seg_events_confidence_medium": "media", + "seg_events_confidence_low": "baja", + "seg_events_correlate": "Abrir en correlacion", + "seg_events_correlate_out_of_range": "La vista de correlacion admite hasta 7 dias.", + "seg_events_min_short": "min", + "seg_events_h_short": "h", "sidebar_analysis": "Analisis", "smart_capture": "Smart Capture", "sc_enable": "Activar Smart Capture", diff --git a/app/i18n/fr.json b/app/i18n/fr.json index dfc5aaa9..9676266a 100644 --- a/app/i18n/fr.json +++ b/app/i18n/fr.json @@ -787,6 +787,25 @@ "seg_unavailable": "Configuration non disponible.", "seg_correlation_ds": "Charge segment DS (%)", "seg_correlation_us": "Charge segment US (%)", + "seg_events_title": "Evenements de saturation du segment", + "seg_events_subtitle": "Periodes ou la charge downstream ou upstream est restee au-dessus du seuil.", + "seg_events_meta": "Seuil {th}% pendant {min}+ min", + "seg_events_loading": "Chargement des evenements de saturation...", + "seg_events_empty": "Aucun evenement de saturation detecte dans cette plage.", + "seg_events_error": "Impossible de charger les evenements de saturation.", + "seg_events_direction_ds": "Downstream", + "seg_events_direction_us": "Upstream", + "seg_events_peak_total": "Pic total", + "seg_events_peak_own": "Pic propre", + "seg_events_peak_neighbor": "Pic charge voisine", + "seg_events_confidence": "Confiance", + "seg_events_confidence_high": "elevee", + "seg_events_confidence_medium": "moyenne", + "seg_events_confidence_low": "faible", + "seg_events_correlate": "Ouvrir dans la correlation", + "seg_events_correlate_out_of_range": "La vue de correlation prend en charge jusqu'a 7 jours.", + "seg_events_min_short": "min", + "seg_events_h_short": "h", "sidebar_analysis": "Analyse", "smart_capture": "Smart Capture", "sc_enable": "Activer Smart Capture", diff --git a/app/i18n/template.json b/app/i18n/template.json index 95f354b8..62e1c3bc 100644 --- a/app/i18n/template.json +++ b/app/i18n/template.json @@ -734,6 +734,25 @@ "dashboard_features": "", "dashboard_features_desc": "", "seg_disabled": "", + "seg_events_title": "", + "seg_events_subtitle": "", + "seg_events_meta": "", + "seg_events_loading": "", + "seg_events_empty": "", + "seg_events_error": "", + "seg_events_direction_ds": "", + "seg_events_direction_us": "", + "seg_events_peak_total": "", + "seg_events_peak_own": "", + "seg_events_peak_neighbor": "", + "seg_events_confidence": "", + "seg_events_confidence_high": "", + "seg_events_confidence_medium": "", + "seg_events_confidence_low": "", + "seg_events_correlate": "", + "seg_events_correlate_out_of_range": "", + "seg_events_min_short": "", + "seg_events_h_short": "", "sidebar_analysis": "", "font_family": "", "font_family_desc": "", diff --git a/app/static/css/segment-utilization.css b/app/static/css/segment-utilization.css index aaf648a4..e7ad9043 100644 --- a/app/static/css/segment-utilization.css +++ b/app/static/css/segment-utilization.css @@ -117,6 +117,142 @@ font-size: 0.85em; } +/* Events Widget */ +.fritz-cable-events { + padding: 18px; + margin-bottom: 20px; +} + +.fritz-cable-events-header { + display: flex; + align-items: baseline; + justify-content: space-between; + gap: 12px; + flex-wrap: wrap; + margin-bottom: 4px; +} + +.fritz-cable-events-meta { + font-size: 0.8em; + color: var(--text-secondary, #888); +} + +.fritz-cable-events-subtitle { + font-size: 0.85em; + color: var(--text-secondary, #888); + margin-bottom: 12px; +} + +.fritz-cable-events-status { + padding: 16px 0; + color: var(--text-secondary, #888); + font-size: 0.9em; +} + +.fritz-cable-events-status.is-error { + color: var(--danger, #ef4444); +} + +.fritz-cable-events-list { + list-style: none; + padding: 0; + margin: 0; + display: flex; + flex-direction: column; + gap: 10px; +} + +.fritz-cable-event { + padding: 12px 14px; + border: 1px solid var(--border, rgba(255,255,255,0.08)); + border-radius: 10px; + background: var(--surface-2, rgba(255,255,255,0.02)); + display: flex; + flex-direction: column; + gap: 6px; +} + +.fritz-cable-event-header { + display: flex; + align-items: center; + gap: 10px; + flex-wrap: wrap; +} + +.fritz-cable-event-direction { + font-weight: 600; + font-size: 0.8em; + letter-spacing: 0.04em; + text-transform: uppercase; + padding: 2px 8px; + border-radius: 999px; + background: rgba(168, 85, 247, 0.15); + color: rgba(168, 85, 247, 1); +} + +.fritz-cable-event-direction.is-upstream { + background: rgba(99, 102, 241, 0.15); + color: rgba(129, 140, 248, 1); +} + +.fritz-cable-event-time { + font-size: 0.9em; + color: var(--text-primary, var(--text)); +} + +.fritz-cable-event-duration { + margin-left: auto; + font-size: 0.85em; + color: var(--text-secondary, #888); +} + +.fritz-cable-event-stats { + display: flex; + flex-wrap: wrap; + gap: 4px 16px; + font-size: 0.85em; + color: var(--text-primary, var(--text)); +} + +.fritz-cable-event-stats em { + font-style: normal; + color: var(--text-secondary, #888); + margin-right: 4px; +} + +.fritz-cable-event-actions { + display: flex; + justify-content: flex-end; +} + +.fritz-cable-event-link { + font-size: 0.85em; + color: var(--accent, #6366f1); + text-decoration: none; + background: transparent; + border: none; + padding: 0; + cursor: pointer; + font: inherit; + line-height: inherit; +} + +.fritz-cable-event-link:hover { + text-decoration: underline; +} + +.fritz-cable-event-link:focus-visible { + outline: 2px solid var(--accent, #6366f1); + outline-offset: 2px; + border-radius: 2px; +} + +.fritz-cable-event-note { + font-size: 0.85em; + color: var(--muted, #888); + font-style: italic; +} + /* Responsive */ @media (max-width: 768px) { .fritz-cable-kpis { @@ -128,4 +264,7 @@ .fritz-cable-header { flex-direction: column; } + .fritz-cable-event-duration { + margin-left: 0; + } } diff --git a/app/static/js/segment-utilization.js b/app/static/js/segment-utilization.js index 4144daeb..db6172cf 100644 --- a/app/static/js/segment-utilization.js +++ b/app/static/js/segment-utilization.js @@ -48,6 +48,7 @@ function loadFritzCableData() { _fritzCableUpdateKPIs(data); _fritzCableRenderChart('fritz-cable-ds-chart', data.samples, 'ds_total', 'ds_own'); _fritzCableRenderChart('fritz-cable-us-chart', data.samples, 'us_total', 'us_own'); + _fritzCableLoadEvents(); }) .catch(function() { if (skel) skel.style.display = 'none'; @@ -56,6 +57,215 @@ function loadFritzCableData() { }); } +/* ── Events Widget ── */ +var _FRITZ_EVENT_THRESHOLD = 80; +var _FRITZ_EVENT_MIN_MINUTES = 3; + +function _fritzCableLoadEvents() { + var list = document.getElementById('fritz-cable-events-list'); + var status = document.getElementById('fritz-cable-events-status'); + var meta = document.getElementById('fritz-cable-events-meta'); + if (!list || !status) return; + + while (list.firstChild) list.removeChild(list.firstChild); + list.style.display = 'none'; + if (meta) meta.textContent = ''; + status.textContent = _fcT('events_loading', 'Loading saturation events...'); + status.className = 'fritz-cable-events-status is-loading'; + status.style.display = 'block'; + + var url = '/api/fritzbox/segment-utilization/events' + + '?range=' + encodeURIComponent(_fritzCableRange) + + '&threshold=' + _FRITZ_EVENT_THRESHOLD + + '&min_minutes=' + _FRITZ_EVENT_MIN_MINUTES; + + fetch(url) + .then(function(r) { return r.json().then(function(body) { return { ok: r.ok, body: body }; }); }) + .then(function(resp) { + if (!resp.ok || resp.body.error) { + status.textContent = (resp.body && resp.body.error) || _fcT('events_error', 'Could not load saturation events.'); + status.className = 'fritz-cable-events-status is-error'; + return; + } + var events = resp.body.events || []; + var threshold = resp.body.threshold != null ? resp.body.threshold : _FRITZ_EVENT_THRESHOLD; + var minMinutes = resp.body.min_minutes != null ? resp.body.min_minutes : _FRITZ_EVENT_MIN_MINUTES; + if (meta) { + meta.textContent = _fcT('events_meta', 'Threshold {th}% for {min}+ min') + .replace('{th}', threshold) + .replace('{min}', minMinutes); + } + if (events.length === 0) { + status.textContent = _fcT('events_empty', 'No saturation events detected in this range.'); + status.className = 'fritz-cable-events-status is-empty'; + return; + } + status.style.display = 'none'; + list.style.display = ''; + events.forEach(function(ev) { list.appendChild(_fritzCableRenderEvent(ev)); }); + }) + .catch(function() { + status.textContent = _fcT('events_error', 'Could not load saturation events.'); + status.className = 'fritz-cable-events-status is-error'; + }); +} + +function _fritzCableFormatTs(ts, compact) { + if (!ts) return ''; + var d = new Date(ts); + if (isNaN(d.getTime())) return ts; + var pad = function(n) { return (n < 10 ? '0' : '') + n; }; + var hh = pad(d.getHours()); + var mm = pad(d.getMinutes()); + var dd = pad(d.getDate()); + var mo = pad(d.getMonth() + 1); + if (compact) return hh + ':' + mm; + return dd + '.' + mo + ' ' + hh + ':' + mm; +} + +function _fritzCableFormatDuration(minutes) { + if (minutes == null) return ''; + var m = Math.max(0, minutes | 0); + if (m < 60) return m + ' ' + _fcT('events_min_short', 'min'); + var h = Math.floor(m / 60); + var rem = m % 60; + if (rem === 0) return h + ' ' + _fcT('events_h_short', 'h'); + return h + ' ' + _fcT('events_h_short', 'h') + ' ' + rem + ' ' + _fcT('events_min_short', 'min'); +} + +function _fritzCablePct(v) { + if (v == null || isNaN(v)) return '-'; + return (+v).toFixed(1) + '%'; +} + +function _fcMakeStat(labelKey, labelFallback, valueText) { + var span = document.createElement('span'); + var em = document.createElement('em'); + em.textContent = _fcT(labelKey, labelFallback) + ':'; + span.appendChild(em); + span.appendChild(document.createTextNode(' ' + valueText)); + return span; +} + +function _fritzCableRenderEvent(ev) { + var li = document.createElement('li'); + li.className = 'fritz-cable-event'; + + var directionKey = ev.direction === 'upstream' ? 'events_direction_us' : 'events_direction_ds'; + var directionLabel = _fcT(directionKey, ev.direction === 'upstream' ? 'Upstream' : 'Downstream'); + var directionCls = ev.direction === 'upstream' ? 'is-upstream' : 'is-downstream'; + + var sameDay = false; + if (ev.start && ev.end) { + var ds = new Date(ev.start); + var de = new Date(ev.end); + sameDay = !isNaN(ds) && !isNaN(de) + && ds.getFullYear() === de.getFullYear() + && ds.getMonth() === de.getMonth() + && ds.getDate() === de.getDate(); + } + var rangeText; + if (ev.start && ev.end && ev.start !== ev.end) { + rangeText = _fritzCableFormatTs(ev.start, false) + ' – ' + _fritzCableFormatTs(ev.end, sameDay); + } else { + rangeText = _fritzCableFormatTs(ev.start, false) + ' (+' + _fritzCableFormatDuration(ev.duration_minutes) + ')'; + } + + var header = document.createElement('div'); + header.className = 'fritz-cable-event-header'; + var dirSpan = document.createElement('span'); + dirSpan.className = 'fritz-cable-event-direction ' + directionCls; + dirSpan.textContent = directionLabel; + header.appendChild(dirSpan); + var timeSpan = document.createElement('span'); + timeSpan.className = 'fritz-cable-event-time'; + timeSpan.textContent = rangeText; + header.appendChild(timeSpan); + var durSpan = document.createElement('span'); + durSpan.className = 'fritz-cable-event-duration'; + durSpan.textContent = _fritzCableFormatDuration(ev.duration_minutes); + header.appendChild(durSpan); + li.appendChild(header); + + var neighbor = ev.peak_neighbor_load; + if (neighbor == null && ev.peak_total != null && ev.peak_own != null) { + neighbor = ev.peak_total - ev.peak_own; + } + var stats = document.createElement('div'); + stats.className = 'fritz-cable-event-stats'; + stats.appendChild(_fcMakeStat('events_peak_total', 'Peak total', _fritzCablePct(ev.peak_total))); + stats.appendChild(_fcMakeStat('events_peak_own', 'Peak own', _fritzCablePct(ev.peak_own))); + stats.appendChild(_fcMakeStat('events_peak_neighbor', 'Peak neighbor load', _fritzCablePct(neighbor))); + var confKey = 'events_confidence_' + (ev.confidence || 'high'); + var confVal = _fcT(confKey, ev.confidence || 'high'); + stats.appendChild(_fcMakeStat('events_confidence', 'Confidence', confVal)); + li.appendChild(stats); + + var actions = document.createElement('div'); + actions.className = 'fritz-cable-event-actions'; + var hours = _fritzCablePickCorrelationHours(ev.start, ev.end, Date.now()); + if (hours != null) { + var btn = document.createElement('button'); + btn.type = 'button'; + btn.className = 'fritz-cable-event-link'; + btn.textContent = _fcT('events_correlate', 'Open in correlation'); + btn.addEventListener('click', function() { + _fritzCableOpenInCorrelation(hours); + }); + actions.appendChild(btn); + } else { + var note = document.createElement('span'); + note.className = 'fritz-cable-event-note'; + note.textContent = _fcT('events_correlate_out_of_range', 'Correlation view supports up to 7 days.'); + actions.appendChild(note); + } + li.appendChild(actions); + + return li; +} + +/* ── Correlation Navigation ── + * The correlation view has a fixed set of "hours" pills (24 / 48 / 168). + * Pick the smallest pill whose window contains the entire event (start + * through end). If the event's start predates the largest pill, return + * null so the caller can hide the action — a narrower window would crop + * the event. Pure function of its inputs. */ +var _FRITZ_CORRELATION_PILLS = [24, 48, 168]; +function _fritzCablePickCorrelationHours(eventStartIso, eventEndIso, nowMs) { + if (!eventStartIso) return null; + var startMs = Date.parse(eventStartIso); + if (isNaN(startMs)) return null; + var ageHours = Math.max(0, (nowMs - startMs) / 3600000); + // +1h pad so the event is comfortably inside the window even after + // rounding the correlation range to the nearest hour. + var needed = Math.ceil(ageHours) + 1; + for (var i = 0; i < _FRITZ_CORRELATION_PILLS.length; i++) { + if (_FRITZ_CORRELATION_PILLS[i] >= needed) return _FRITZ_CORRELATION_PILLS[i]; + } + return null; +} + +/* Activating the correlation pill BEFORE switchView is load-bearing: + * switchView synchronously calls loadCorrelationData(), which reads the + * active pill via getPillValue(). If we switched first and activated the + * pill afterwards, two fetches would race and stale data could win. */ +function _fritzCableOpenInCorrelation(hours) { + if (hours == null) return; + var tabs = document.querySelectorAll('#correlation-tabs .trend-tab'); + tabs.forEach(function(tab) { + var match = parseInt(tab.getAttribute('data-value'), 10) === hours; + tab.classList.toggle('active', match); + }); + if (typeof window.switchView === 'function') { + window.switchView('correlation'); + } else { + location.hash = 'correlation'; + } +} + +window._fritzCablePickCorrelationHours = _fritzCablePickCorrelationHours; +window._fritzCableOpenInCorrelation = _fritzCableOpenInCorrelation; + /* ── KPI Update ── */ function _fritzCableUpdateKPIs(data) { var latest = data.latest && data.latest[0]; diff --git a/app/storage/segment_utilization.py b/app/storage/segment_utilization.py index 53ef63e0..2e4faba0 100644 --- a/app/storage/segment_utilization.py +++ b/app/storage/segment_utilization.py @@ -5,6 +5,27 @@ from datetime import datetime, timedelta, timezone +EVENT_DEFAULT_THRESHOLD = 80 +EVENT_DEFAULT_MIN_MINUTES = 3 + + +def _parse_ts(ts): + """Parse DOCSight's ISO timestamp strings into aware UTC datetimes.""" + if not ts: + return None + try: + if ts.endswith("Z"): + return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + return datetime.fromisoformat(ts) + except (ValueError, TypeError): + return None + + +def _minutes_between(start_dt, end_dt): + """Whole minutes between two aware datetimes (rounded to nearest).""" + return int(round((end_dt - start_dt).total_seconds() / 60.0)) + + def _normalize_range_ts(ts, separator="T"): """Accept either ISO 'T' or legacy space-separated timestamps for queries.""" if not ts or len(ts) < 19: @@ -43,6 +64,25 @@ def _ensure_table(self): CREATE UNIQUE INDEX IF NOT EXISTS idx_segment_util_ts ON segment_utilization(timestamp) """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS segment_utilization_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + direction TEXT NOT NULL, + start_ts TEXT NOT NULL, + end_ts TEXT NOT NULL, + duration_minutes INTEGER NOT NULL, + peak_total REAL, + peak_own REAL, + peak_neighbor_load REAL, + confidence TEXT, + threshold INTEGER NOT NULL, + min_minutes INTEGER NOT NULL + ) + """) + conn.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS idx_segment_util_events_key + ON segment_utilization_events(direction, start_ts, threshold, min_minutes) + """) conn.commit() finally: conn.close() @@ -125,11 +165,21 @@ def downsample(self, fine_after_days=7, fine_bucket_min=5, coarse_after_days=30, - Samples older than fine_after_days (default 7): averaged into fine_bucket_min (5-min) buckets - Samples older than coarse_after_days (default 30): averaged into coarse_bucket_min (15-min) buckets + Before data is downsampled, materialize saturation events at default + parameters (threshold=80%, min_minutes=3) so that peaks are preserved + for the long-term events view even after sample averaging. + Returns total number of rows removed by aggregation. """ now = datetime.now(timezone.utc) removed = 0 + # Materialize events BEFORE we lose raw peaks to averaging. Use the + # earliest cutoff (fine) so any data that's about to be aggregated is + # inspected first. + fine_cutoff = (now - timedelta(days=fine_after_days)).strftime("%Y-%m-%dT%H:%M:%SZ") + self.materialize_events_before(fine_cutoff) + tiers = [ (coarse_after_days, coarse_bucket_min), # coarse first (older data) (fine_after_days, fine_bucket_min), @@ -141,6 +191,86 @@ def downsample(self, fine_after_days=7, fine_bucket_min=5, coarse_after_days=30, return removed + def materialize_events_before(self, before_ts, + threshold=EVENT_DEFAULT_THRESHOLD, + min_minutes=EVENT_DEFAULT_MIN_MINUTES): + """Detect saturation events ending before ``before_ts`` and persist them. + + Invoked ahead of downsampling so peak samples drive event records that + survive future aggregation. Idempotent via a UNIQUE index on + (direction, start_ts, threshold, min_minutes). + + Returns the number of newly inserted event rows. + """ + before_ts = _normalize_range_ts(before_ts, "T") + rows = self.get_range("2000-01-01T00:00:00Z", before_ts) + if not rows: + return 0 + + events = [] + events.extend(self._detect_runs(rows, "ds_total", "ds_own", "downstream", threshold, min_minutes)) + events.extend(self._detect_runs(rows, "us_total", "us_own", "upstream", threshold, min_minutes)) + # Only materialize runs that are fully inside the historical window. + # Ongoing runs that extend past ``before_ts`` are still visible via + # live detection on raw data and would just be duplicates here. + events = [e for e in events if e["end"] < before_ts] + if not events: + return 0 + + inserted = 0 + with self._lock: + conn = self._connect() + try: + for ev in events: + cur = conn.execute( + "INSERT OR IGNORE INTO segment_utilization_events " + "(direction, start_ts, end_ts, duration_minutes, peak_total, peak_own, " + " peak_neighbor_load, confidence, threshold, min_minutes) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + ev["direction"], ev["start"], ev["end"], ev["duration_minutes"], + ev["peak_total"], ev["peak_own"], ev["peak_neighbor_load"], + ev["confidence"], threshold, min_minutes, + ), + ) + if cur.rowcount: + inserted += 1 + conn.commit() + finally: + conn.close() + return inserted + + def _load_materialized_events(self, start_ts, end_ts, threshold, min_minutes): + """Return materialized events that fall within the requested range.""" + start_ts = _normalize_range_ts(start_ts, "T") + end_ts = _normalize_range_ts(end_ts, "T") + conn = self._connect() + try: + rows = conn.execute( + "SELECT direction, start_ts, end_ts, duration_minutes, peak_total, peak_own, " + "peak_neighbor_load, confidence " + "FROM segment_utilization_events " + "WHERE threshold = ? AND min_minutes = ? " + "AND start_ts >= ? AND start_ts <= ? " + "ORDER BY start_ts", + (threshold, min_minutes, start_ts, end_ts), + ).fetchall() + return [ + { + "direction": r["direction"], + "start": r["start_ts"], + "end": r["end_ts"], + "duration_minutes": r["duration_minutes"], + "peak_total": r["peak_total"], + "peak_own": r["peak_own"], + "peak_neighbor_load": r["peak_neighbor_load"], + "confidence": r["confidence"], + } + for r in rows + ] + finally: + conn.close() + def _downsample_range(self, before_ts, bucket_minutes): """Aggregate all samples before before_ts into bucket_minutes-wide averages.""" with self._lock: @@ -194,6 +324,92 @@ def _downsample_range(self, before_ts, bucket_minutes): finally: conn.close() + def get_events(self, start_ts, end_ts, threshold=EVENT_DEFAULT_THRESHOLD, + min_minutes=EVENT_DEFAULT_MIN_MINUTES): + """Return segment saturation events within a time range. + + Combines live detection on raw samples (``get_range`` -> ``_detect_runs``) + with records previously materialized by ``materialize_events_before``. + Materialized events are preferred because they were computed from raw + peaks before downsampling averaged them away. Live detection catches + events in still-raw data (not yet materialized) as well as events with + non-default thresholds. Dedupe key is (direction, start). + + An event is a window where ds_total or us_total stayed at or above + ``threshold`` for at least ``min_minutes`` consecutive minute-spaced + samples. A gap greater than ~90s between adjacent samples breaks a + raw run. + """ + rows = self.get_range(start_ts, end_ts) + detected = [] + if rows: + detected.extend(self._detect_runs(rows, "ds_total", "ds_own", "downstream", threshold, min_minutes)) + detected.extend(self._detect_runs(rows, "us_total", "us_own", "upstream", threshold, min_minutes)) + + stored = self._load_materialized_events(start_ts, end_ts, threshold, min_minutes) + + merged = {} + for ev in detected: + merged[(ev["direction"], ev["start"])] = ev + # Stored records win — they were computed on pre-downsample data. + for ev in stored: + merged[(ev["direction"], ev["start"])] = ev + + return sorted(merged.values(), key=lambda e: (e["start"], e["direction"])) + + @staticmethod + def _detect_runs(rows, total_key, own_key, direction, threshold, min_minutes): + """Walk rows once, emitting one event per qualifying consecutive run.""" + detected = [] + run = [] # list of (ts_str, parsed_dt, total, own) + prev_dt = None + + def flush(): + if len(run) >= min_minutes: + peak_total = max(r[2] for r in run) + peak_idx = max(range(len(run)), key=lambda i: run[i][2]) + peak_own = run[peak_idx][3] if run[peak_idx][3] is not None else 0.0 + peak_neighbor = max( + r[2] - (r[3] if r[3] is not None else 0.0) for r in run + ) + duration = _minutes_between(run[0][1], run[-1][1]) + 1 + detected.append({ + "direction": direction, + "start": run[0][0], + "end": run[-1][0], + "duration_minutes": duration, + "peak_total": peak_total, + "peak_own": peak_own, + "peak_neighbor_load": peak_neighbor, + "confidence": "high", + }) + + for row in rows: + total = row.get(total_key) + ts_str = row["timestamp"] + dt = _parse_ts(ts_str) + if total is None or dt is None: + flush() + run = [] + prev_dt = None + continue + if total < threshold: + flush() + run = [] + prev_dt = dt + continue + # Above threshold. Break run if gap from previous raw step > ~90s. + if prev_dt is not None and run: + gap = (dt - prev_dt).total_seconds() + if gap > 90: + flush() + run = [] + run.append((ts_str, dt, total, row.get(own_key))) + prev_dt = dt + + flush() + return detected + def cleanup(self, days=365): """Delete records older than the given number of days. Returns count deleted.""" cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ") diff --git a/app/templates/segment_utilization_tab.html b/app/templates/segment_utilization_tab.html index 000f6fa0..d9dfd0ba 100644 --- a/app/templates/segment_utilization_tab.html +++ b/app/templates/segment_utilization_tab.html @@ -57,6 +57,16 @@

{{ t.get('seg_title', 'Segment Utilization') }} +
+
+
{{ t.get('seg_events_title', 'Segment Saturation Events') }}
+
+
+
{{ t.get('seg_events_subtitle', 'Periods where downstream or upstream load stayed at or above the threshold.') }}
+
+
    +
    +
    {{ t.get('seg_note', 'Segment utilization values are estimates provided by the FRITZ!Box and may differ from actual CMTS-wide load.') }}
    diff --git a/tests/modules/test_fritzbox_cable_routes.py b/tests/modules/test_fritzbox_cable_routes.py index 4dc9fd56..025688a8 100644 --- a/tests/modules/test_fritzbox_cable_routes.py +++ b/tests/modules/test_fritzbox_cable_routes.py @@ -51,6 +51,144 @@ def test_returns_stored_data(self, mock_get_storage, mock_get_config, client): assert "stats" in data +class TestSegmentEventsEndpoint: + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_returns_events_payload(self, mock_get_storage, mock_get_config, client): + mock_cfg = MagicMock() + mock_cfg.get.return_value = "fritzbox" + mock_cfg.is_segment_utilization_enabled.return_value = True + mock_get_config.return_value = mock_cfg + + mock_storage = MagicMock() + mock_storage.get_events.return_value = [ + { + "direction": "downstream", + "start": "2026-03-09T10:01:00Z", + "end": "2026-03-09T10:03:00Z", + "duration_minutes": 3, + "peak_total": 90.0, + "peak_own": 3.0, + "peak_neighbor_load": 87.0, + "confidence": "high", + } + ] + mock_get_storage.return_value = mock_storage + + resp = client.get("/api/fritzbox/segment-utilization/events?range=7d&threshold=80&min_minutes=3") + assert resp.status_code == 200 + data = resp.get_json() + assert "events" in data + assert len(data["events"]) == 1 + ev = data["events"][0] + assert ev["direction"] == "downstream" + assert ev["duration_minutes"] == 3 + # Echo back the validated params so the UI can show them. + assert data.get("threshold") == 80 + assert data.get("min_minutes") == 3 + + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_clamps_invalid_threshold_and_min_minutes(self, mock_get_storage, mock_get_config, client): + mock_cfg = MagicMock() + mock_cfg.get.return_value = "fritzbox" + mock_cfg.is_segment_utilization_enabled.return_value = True + mock_get_config.return_value = mock_cfg + + mock_storage = MagicMock() + mock_storage.get_events.return_value = [] + mock_get_storage.return_value = mock_storage + + # Unparseable values fall back to defaults, out-of-range values are clamped. + resp = client.get("/api/fritzbox/segment-utilization/events?threshold=abc&min_minutes=-5") + assert resp.status_code == 200 + data = resp.get_json() + assert data["threshold"] == 80 # unparseable -> default + assert data["min_minutes"] == 1 # -5 clamps to lower bound + + resp = client.get("/api/fritzbox/segment-utilization/events?threshold=500&min_minutes=99999") + assert resp.status_code == 200 + data = resp.get_json() + assert data["threshold"] == 100 + assert data["min_minutes"] == 1440 + + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_rejects_when_driver_unsupported(self, mock_get_storage, mock_get_config, client): + mock_cfg = MagicMock() + mock_cfg.get.return_value = "arris" + mock_get_config.return_value = mock_cfg + mock_get_storage.return_value = MagicMock() + + resp = client.get("/api/fritzbox/segment-utilization/events") + assert resp.status_code == 400 + + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_passes_range_to_storage(self, mock_get_storage, mock_get_config, client): + mock_cfg = MagicMock() + mock_cfg.get.return_value = "fritzbox" + mock_cfg.is_segment_utilization_enabled.return_value = True + mock_get_config.return_value = mock_cfg + + mock_storage = MagicMock() + mock_storage.get_events.return_value = [] + mock_get_storage.return_value = mock_storage + + resp = client.get("/api/fritzbox/segment-utilization/events?range=24h&threshold=70&min_minutes=5") + assert resp.status_code == 200 + mock_storage.get_events.assert_called_once() + _, kwargs = mock_storage.get_events.call_args + assert kwargs.get("threshold") == 70 + assert kwargs.get("min_minutes") == 5 + + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_invalid_range_is_normalized_in_echo(self, mock_get_storage, mock_get_config, client): + """An unrecognized range string must not be echoed verbatim — + the response reports the default the server actually used.""" + mock_cfg = MagicMock() + mock_cfg.get.return_value = "fritzbox" + mock_cfg.is_segment_utilization_enabled.return_value = True + mock_get_config.return_value = mock_cfg + + mock_storage = MagicMock() + mock_storage.get_events.return_value = [] + mock_get_storage.return_value = mock_storage + + resp = client.get("/api/fritzbox/segment-utilization/events?range=not-a-range") + assert resp.status_code == 200 + data = resp.get_json() + assert data["range"] == "7d" + + +class TestSegmentDataEndpointRange: + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_invalid_range_normalizes_to_default(self, mock_get_storage, mock_get_config, client): + mock_cfg = MagicMock() + mock_cfg.get.return_value = "fritzbox" + mock_cfg.is_segment_utilization_enabled.return_value = True + mock_get_config.return_value = mock_cfg + + mock_storage = MagicMock() + mock_storage.get_range.return_value = [] + mock_storage.get_latest.return_value = [] + mock_storage.get_stats.return_value = {"count": 0} + mock_get_storage.return_value = mock_storage + + resp = client.get("/api/fritzbox/segment-utilization?range=bogus") + # The default (24h) was applied, so the call succeeds and does not + # echo the invalid input back anywhere in the response. + assert resp.status_code == 200 + + class TestSegmentRangeEndpoint: @patch("app.blueprints.segment_bp.require_auth", lambda f: f) @patch("app.blueprints.segment_bp.get_config_manager") @@ -58,7 +196,7 @@ class TestSegmentRangeEndpoint: def test_range_endpoint_for_correlation(self, mock_get_storage, mock_get_config, client): mock_cfg = MagicMock() mock_cfg.get.return_value = "fritzbox" - mock_cfg.is_demo_mode.return_value = False + mock_cfg.is_segment_utilization_enabled.return_value = True mock_get_config.return_value = mock_cfg mock_storage = MagicMock() @@ -72,3 +210,44 @@ def test_range_endpoint_for_correlation(self, mock_get_storage, mock_get_config, data = resp.get_json() assert len(data) == 1 assert data[0]["ds_total"] == pytest.approx(6.2) + + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_range_endpoint_returns_empty_when_driver_unsupported( + self, mock_get_storage, mock_get_config, client, + ): + """The correlation graph fetches this range endpoint opportunistically. + When the modem driver isn't fritzbox, the endpoint must return an + empty list so the correlation view degrades cleanly.""" + mock_cfg = MagicMock() + mock_cfg.get.return_value = "arris" + mock_cfg.is_segment_utilization_enabled.return_value = True + mock_get_config.return_value = mock_cfg + mock_get_storage.return_value = MagicMock() + + resp = client.get( + "/api/fritzbox/segment-utilization/range" + "?start=2026-03-09T00:00:00Z&end=2026-03-09T23:59:59Z" + ) + assert resp.status_code == 200 + assert resp.get_json() == [] + + @patch("app.blueprints.segment_bp.require_auth", lambda f: f) + @patch("app.blueprints.segment_bp.get_config_manager") + @patch("app.blueprints.segment_bp._get_storage") + def test_range_endpoint_returns_empty_when_feature_disabled( + self, mock_get_storage, mock_get_config, client, + ): + mock_cfg = MagicMock() + mock_cfg.get.return_value = "fritzbox" + mock_cfg.is_segment_utilization_enabled.return_value = False + mock_get_config.return_value = mock_cfg + mock_get_storage.return_value = MagicMock() + + resp = client.get( + "/api/fritzbox/segment-utilization/range" + "?start=2026-03-09T00:00:00Z&end=2026-03-09T23:59:59Z" + ) + assert resp.status_code == 200 + assert resp.get_json() == [] diff --git a/tests/modules/test_fritzbox_cable_storage.py b/tests/modules/test_fritzbox_cable_storage.py index bd82eb2d..20b73e1e 100644 --- a/tests/modules/test_fritzbox_cable_storage.py +++ b/tests/modules/test_fritzbox_cable_storage.py @@ -1,7 +1,5 @@ """Tests for fritzbox_cable segment utilization storage.""" -import os -import tempfile import pytest from app.storage.segment_utilization import SegmentUtilizationStorage @@ -169,6 +167,258 @@ def test_downsample_idempotent(self, storage): assert len(rows) == 1 +class TestGetEvents: + """Detect saturation events from raw 1-minute samples. + + Events: windows where ds_total or us_total stays >= threshold for + at least min_minutes consecutive minute-spaced samples. Downstream + and upstream are evaluated independently; a gap in the minute stream + breaks a raw event. + """ + + def test_empty_range_returns_no_events(self, storage): + events = storage.get_events( + "2000-01-01T00:00:00Z", "2000-01-02T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert events == [] + + def test_single_sample_above_threshold_below_min_duration(self, storage): + storage.save_at("2026-03-09T10:00:00Z", 50.0, 50.0, 1.0, 1.0) + storage.save_at("2026-03-09T10:01:00Z", 85.0, 50.0, 1.0, 1.0) + storage.save_at("2026-03-09T10:02:00Z", 50.0, 50.0, 1.0, 1.0) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert events == [] + + def test_sustained_downstream_event(self, storage): + storage.save_at("2026-03-09T10:00:00Z", 50.0, 30.0, 1.0, 0.5) + storage.save_at("2026-03-09T10:01:00Z", 82.0, 30.0, 2.0, 0.5) + storage.save_at("2026-03-09T10:02:00Z", 90.0, 30.0, 3.0, 0.5) + storage.save_at("2026-03-09T10:03:00Z", 88.0, 30.0, 4.0, 0.5) + storage.save_at("2026-03-09T10:04:00Z", 50.0, 30.0, 1.0, 0.5) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 1 + ev = events[0] + assert ev["direction"] == "downstream" + assert ev["start"] == "2026-03-09T10:01:00Z" + assert ev["end"] == "2026-03-09T10:03:00Z" + assert ev["duration_minutes"] == 3 + assert ev["peak_total"] == pytest.approx(90.0) + assert ev["peak_own"] == pytest.approx(3.0) + assert ev["peak_neighbor_load"] == pytest.approx(87.0) + assert ev["confidence"] == "high" + + def test_peak_neighbor_load_uses_max_across_run_not_peak_total_sample(self, storage): + """Regression: peak neighbor must be max(total - own) across the run, + not (total - own) at the single peak-total sample. If own traffic + dominates at the peak total minute, the neighbor contribution is + actually maximized elsewhere in the run.""" + # Run of 3 above-threshold minutes. + # t0: total=85, own=5 -> neighbor = 80 + # t1: total=92, own=60 -> neighbor = 32 (peak total, low neighbor) + # t2: total=83, own=2 -> neighbor = 81 (peak neighbor) + storage.save_at("2026-03-09T10:00:00Z", 85.0, 10.0, 5.0, 0.5) + storage.save_at("2026-03-09T10:01:00Z", 92.0, 10.0, 60.0, 0.5) + storage.save_at("2026-03-09T10:02:00Z", 83.0, 10.0, 2.0, 0.5) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 1 + ev = events[0] + assert ev["peak_total"] == pytest.approx(92.0) + assert ev["peak_own"] == pytest.approx(60.0) + # Must report the highest neighbor contribution seen in the run, + # which occurs at t2 with neighbor = 83 - 2 = 81. + assert ev["peak_neighbor_load"] == pytest.approx(81.0) + + def test_sustained_upstream_event(self, storage): + storage.save_at("2026-03-09T12:00:00Z", 10.0, 40.0, 0.5, 0.5) + storage.save_at("2026-03-09T12:01:00Z", 10.0, 81.0, 0.5, 2.0) + storage.save_at("2026-03-09T12:02:00Z", 10.0, 85.0, 0.5, 3.0) + storage.save_at("2026-03-09T12:03:00Z", 10.0, 92.0, 0.5, 4.0) + storage.save_at("2026-03-09T12:04:00Z", 10.0, 40.0, 0.5, 0.5) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 1 + ev = events[0] + assert ev["direction"] == "upstream" + assert ev["start"] == "2026-03-09T12:01:00Z" + assert ev["end"] == "2026-03-09T12:03:00Z" + assert ev["duration_minutes"] == 3 + assert ev["peak_total"] == pytest.approx(92.0) + assert ev["peak_own"] == pytest.approx(4.0) + assert ev["peak_neighbor_load"] == pytest.approx(88.0) + assert ev["confidence"] == "high" + + def test_two_separate_events(self, storage): + # Event 1: 10:00 through 10:02 (3 min) downstream + for i, v in enumerate([85.0, 90.0, 82.0]): + storage.save_at(f"2026-03-09T10:0{i}:00Z", v, 10.0, 1.0, 0.2) + # Gap of low samples + storage.save_at("2026-03-09T10:03:00Z", 30.0, 10.0, 1.0, 0.2) + storage.save_at("2026-03-09T10:04:00Z", 30.0, 10.0, 1.0, 0.2) + # Event 2: 10:05 through 10:07 (3 min) downstream + for i, v in enumerate([81.0, 83.0, 84.0]): + storage.save_at(f"2026-03-09T10:0{i+5}:00Z", v, 10.0, 1.0, 0.2) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 2 + assert events[0]["start"] == "2026-03-09T10:00:00Z" + assert events[0]["end"] == "2026-03-09T10:02:00Z" + assert events[1]["start"] == "2026-03-09T10:05:00Z" + assert events[1]["end"] == "2026-03-09T10:07:00Z" + + def test_missing_sample_breaks_raw_event(self, storage): + # 4 above-threshold samples but with a 2-minute gap in the middle; + # neither half alone meets the 3-minute requirement. + storage.save_at("2026-03-09T10:00:00Z", 85.0, 10.0, 1.0, 0.2) + storage.save_at("2026-03-09T10:01:00Z", 86.0, 10.0, 1.0, 0.2) + # gap at 10:02 and 10:03 + storage.save_at("2026-03-09T10:04:00Z", 87.0, 10.0, 1.0, 0.2) + storage.save_at("2026-03-09T10:05:00Z", 88.0, 10.0, 1.0, 0.2) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert events == [] + + def test_ds_and_us_events_are_independent(self, storage): + # Same 3 minutes: downstream saturates, upstream does not. + storage.save_at("2026-03-09T10:00:00Z", 85.0, 40.0, 1.0, 0.5) + storage.save_at("2026-03-09T10:01:00Z", 90.0, 40.0, 1.0, 0.5) + storage.save_at("2026-03-09T10:02:00Z", 82.0, 40.0, 1.0, 0.5) + events = storage.get_events( + "2026-03-09T00:00:00Z", "2026-03-10T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 1 + assert events[0]["direction"] == "downstream" + + def test_respects_time_range_filter(self, storage): + for i in range(3): + storage.save_at(f"2026-03-09T10:0{i}:00Z", 90.0, 10.0, 1.0, 0.2) + events = storage.get_events( + "2026-03-10T00:00:00Z", "2026-03-11T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert events == [] + + +class TestMaterializedEvents: + """Events must survive downsampling: peaks are captured into a dedicated + events table before sample averaging smooths them away.""" + + def test_events_survive_downsampling(self, storage): + """Raw 1-min samples with a 3-minute saturation peak get downsampled + into 5-min averages that fall below threshold. The event must still + be returned by get_events() because it was materialized first.""" + # 5 raw samples in a 5-min bucket. Only 3 of them exceed 80%, so the + # bucket average is ~68 (below threshold) after downsampling. + storage.save_at("2020-01-01T14:00:00Z", 40.0, 10.0, 1.0, 0.1) + storage.save_at("2020-01-01T14:01:00Z", 85.0, 10.0, 1.0, 0.1) + storage.save_at("2020-01-01T14:02:00Z", 90.0, 10.0, 1.0, 0.1) + storage.save_at("2020-01-01T14:03:00Z", 82.0, 10.0, 1.0, 0.1) + storage.save_at("2020-01-01T14:04:00Z", 40.0, 10.0, 1.0, 0.1) + + # Sanity: event detectable on raw data. + raw_events = storage.get_events( + "2020-01-01T00:00:00Z", "2020-01-02T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(raw_events) == 1 + assert raw_events[0]["peak_total"] == pytest.approx(90.0) + + # Downsample aggressively — samples older than 0 days get averaged. + storage.downsample(fine_after_days=0, fine_bucket_min=5, + coarse_after_days=9999, coarse_bucket_min=15) + + remaining = storage.get_range("2020-01-01T00:00:00Z", "2020-01-02T00:00:00Z") + assert len(remaining) == 1 + # The averaged sample falls below the 80% threshold — the event is no + # longer detectable from raw data. + assert remaining[0]["ds_total"] < 80 + + # But the event survives via the materialized events table. + preserved = storage.get_events( + "2020-01-01T00:00:00Z", "2020-01-02T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(preserved) == 1 + ev = preserved[0] + assert ev["direction"] == "downstream" + assert ev["start"] == "2020-01-01T14:01:00Z" + assert ev["end"] == "2020-01-01T14:03:00Z" + assert ev["peak_total"] == pytest.approx(90.0) + assert ev["duration_minutes"] == 3 + + def test_materialize_idempotent(self, storage): + """Running materialize twice must not create duplicate event rows.""" + for i, v in enumerate([85.0, 90.0, 82.0]): + storage.save_at(f"2020-01-01T14:0{i}:00Z", v, 10.0, 1.0, 0.2) + + first = storage.materialize_events_before("2020-01-02T00:00:00Z") + second = storage.materialize_events_before("2020-01-02T00:00:00Z") + assert first == 1 + assert second == 0 + events = storage.get_events( + "2020-01-01T00:00:00Z", "2020-01-02T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 1 + + def test_materialize_skips_events_still_in_progress(self, storage): + """Events whose end is at or past the cutoff may still be extending; + they should not be stored yet to avoid locking in a truncated run.""" + # Run ends at 14:02, cutoff is 14:02 — end is NOT strictly before cutoff. + for i, v in enumerate([85.0, 90.0, 82.0]): + storage.save_at(f"2020-01-01T14:0{i}:00Z", v, 10.0, 1.0, 0.2) + inserted = storage.materialize_events_before("2020-01-01T14:02:00Z") + assert inserted == 0 + + # With a later cutoff the event is fully historical and gets stored. + inserted = storage.materialize_events_before("2020-01-01T14:05:00Z") + assert inserted == 1 + + def test_get_events_dedupes_materialized_with_raw(self, storage): + """If both raw and materialized detection see the same event, it + must appear only once in the merged output.""" + for i, v in enumerate([85.0, 90.0, 82.0]): + storage.save_at(f"2020-01-01T14:0{i}:00Z", v, 10.0, 1.0, 0.2) + storage.materialize_events_before("2020-01-02T00:00:00Z") + # Raw data is still present (we did not downsample), so on-demand + # detection would also produce the event. The merged result dedupes. + events = storage.get_events( + "2020-01-01T00:00:00Z", "2020-01-02T00:00:00Z", + threshold=80, min_minutes=3, + ) + assert len(events) == 1 + + def test_non_default_threshold_uses_raw_detection(self, storage): + """Materialized events are stored with default parameters. Queries + with non-default thresholds must still work (via raw detection on + data that has not been downsampled).""" + storage.save_at("2020-01-01T14:00:00Z", 70.0, 10.0, 1.0, 0.2) + storage.save_at("2020-01-01T14:01:00Z", 72.0, 10.0, 1.0, 0.2) + storage.save_at("2020-01-01T14:02:00Z", 71.0, 10.0, 1.0, 0.2) + events = storage.get_events( + "2020-01-01T00:00:00Z", "2020-01-02T00:00:00Z", + threshold=65, min_minutes=3, + ) + assert len(events) == 1 + assert events[0]["peak_total"] == pytest.approx(72.0) + + class TestCleanup: def test_cleanup_removes_old_records(self, storage): import sqlite3