From 4695bd43847fcf35221377752be5e884404566bb Mon Sep 17 00:00:00 2001 From: Jeb Baugh Date: Sun, 18 Jan 2026 11:33:45 -0600 Subject: [PATCH] feat: add watchdog wrapper and FTDI hardening with full recovery on device detach/reattach - Introduce WatchdogSerialReader with dead-link detection, exception recovery, and automatic FTDI reopen logic - Restore SerialReader to clean, canonical ingestion loop - Add SerialReaderProtocol for structural typing and mock compatibility - Add full watchdog unit test suite (4 tests, 100% pass) - Validate FTDI disappearance and reappearance on Beamrider-0001: - watchdog_read_exception triggered correctly - watchdog_reopen_start / success loop maintained service uptime - ingestion resumed automatically after FTDI reattach --- app/ingestion/geiger_reader.py | 7 +- app/ingestion/serial_reader.py | 4 +- app/ingestion/watchdog.py | 120 ++++++++++++++++++++++ tests/unit/test_watchdog_serial_reader.py | 95 +++++++++++++++++ 4 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 app/ingestion/watchdog.py create mode 100644 tests/unit/test_watchdog_serial_reader.py diff --git a/app/ingestion/geiger_reader.py b/app/ingestion/geiger_reader.py index f438f0d..b1f40a9 100755 --- a/app/ingestion/geiger_reader.py +++ b/app/ingestion/geiger_reader.py @@ -6,7 +6,7 @@ from app.ingestion.api_client import PushClient from app.ingestion.serial_reader import SerialReader - +from app.ingestion.watchdog import WatchdogSerialReader def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( @@ -44,11 +44,14 @@ def main() -> int: ) logging.info(f"Device ID: {args.device_id}") - reader = SerialReader( + base_reader = SerialReader( device=args.device, baudrate=args.baudrate, ) + reader = WatchdogSerialReader(base_reader) + + client = PushClient( api_url=args.api_url, api_token=args.api_token, diff --git a/app/ingestion/serial_reader.py b/app/ingestion/serial_reader.py index 5c44718..17661d8 100755 --- a/app/ingestion/serial_reader.py +++ b/app/ingestion/serial_reader.py @@ -61,6 +61,6 @@ def run(self) -> None: except (KeyboardInterrupt, StopIteration): break - except Exception as e: - logging.error(f"Error in serial loop: {e}") + except Exception as exc: + logging.error(f"Error in serial loop: {exc}") time.sleep(0.1) diff --git a/app/ingestion/watchdog.py b/app/ingestion/watchdog.py new file mode 100644 index 0000000..8a4ca1b --- /dev/null +++ b/app/ingestion/watchdog.py @@ -0,0 +1,120 @@ +# filename: app/ingestion/watchdog.py + +import time +import logging +from typing import Any, Optional, Callable, Dict, Protocol + +from app.ingestion.csv_parser import parse_geiger_csv + +log = logging.getLogger(__name__) + + +class SerialReaderProtocol(Protocol): + ser: Any + + def set_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None: + ... + + def read_line(self) -> str: + ... + + +class WatchdogSerialReader: + """ + Drop-in wrapper around a SerialReader-like object that adds: + - dead-read detection + - FTDI disappearance detection + - automatic port reopen + """ + + def __init__( + self, + reader: SerialReaderProtocol, + dead_threshold_seconds: float = 5.0, + reopen_sleep_seconds: float = 2.0, + ) -> None: + self._reader: SerialReaderProtocol = reader + self._dead_threshold = dead_threshold_seconds + self._reopen_sleep = reopen_sleep_seconds + self._last_frame_ts = time.time() + self._handler: Optional[Callable[[Dict[str, Any]], None]] = None + + # ------------------------------------------------------------ + # Public API: must match SerialReader + # ------------------------------------------------------------ + + def set_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None: + self._handler = handler + self._reader.set_handler(handler) + + def run(self) -> None: + """ + Same loop as SerialReader.run(), but using watchdog-aware read_line(). + """ + while True: + try: + raw = self.read_line() + log.info(f"RAW: {raw!r}") + + parsed = parse_geiger_csv(raw) + log.info(f"PARSED: {parsed}") + + if parsed is not None and self._handler is not None: + self._handler(parsed) + + except (KeyboardInterrupt, StopIteration): + break + + except Exception as exc: + log.error(f"Error in watchdog serial loop: {exc}") + time.sleep(0.1) + + # ------------------------------------------------------------ + # Watchdog logic + # ------------------------------------------------------------ + + def read_line(self) -> str: + now = time.time() + + # Dead link detection + if now - self._last_frame_ts > self._dead_threshold: + log.warning( + "watchdog_dead_link_detected", + extra={"last_frame_age": now - self._last_frame_ts}, + ) + self._reopen() + + try: + line = self._reader.read_line() + except Exception as exc: + log.error("watchdog_read_exception", extra={"error": repr(exc)}) + self._reopen() + line = self._reader.read_line() + + if line: + self._last_frame_ts = time.time() + + return line + + def _reopen(self) -> None: + log.warning("watchdog_reopen_start") + + try: + # Best-effort close with proper type narrowing + ser = getattr(self._reader, "ser", None) + if ser is not None: + try: + ser.close() + except Exception as exc: + log.error("watchdog_close_failed", extra={"error": repr(exc)}) + + # Force lazy reopen + self._reader.ser = None + + time.sleep(self._reopen_sleep) + + log.warning("watchdog_reopen_success") + + except Exception as exc: + log.error("watchdog_reopen_failed", extra={"error": repr(exc)}) + raise diff --git a/tests/unit/test_watchdog_serial_reader.py b/tests/unit/test_watchdog_serial_reader.py new file mode 100644 index 0000000..8dba4fc --- /dev/null +++ b/tests/unit/test_watchdog_serial_reader.py @@ -0,0 +1,95 @@ +# filename: tests/unit/test_watchdog_serial_reader.py + +import time +import pytest + +from app.ingestion.watchdog import WatchdogSerialReader + + +class MockSerial: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + +class MockReader: + def __init__(self, lines=None, raise_on_call=False): + self.lines = lines or [] + self.raise_on_call = raise_on_call + self.calls = 0 + self.handler = None + self.ser = MockSerial() + + def set_handler(self, handler): + self.handler = handler + + def read_line(self): + self.calls += 1 + if self.raise_on_call: + raise RuntimeError("boom") + if self.lines: + return self.lines.pop(0) + return "" + + + +def test_watchdog_proxies_set_handler(): + mock = MockReader() + wd = WatchdogSerialReader(mock) + + def handler(_): + pass + + wd.set_handler(handler) + + assert mock.handler is handler + + +def test_watchdog_updates_last_frame_timestamp_on_data(): + mock = MockReader(lines=["abc"]) + wd = WatchdogSerialReader(mock) + + before = wd._last_frame_ts + time.sleep(0.01) + + line = wd.read_line() + + assert line == "abc" + assert wd._last_frame_ts > before + + +def test_watchdog_triggers_reopen_on_exception(monkeypatch): + mock = MockReader(raise_on_call=True) + wd = WatchdogSerialReader(mock) + + reopened = {"called": False} + + def fake_reopen(): + reopened["called"] = True + + monkeypatch.setattr(wd, "_reopen", fake_reopen) + + # read_line should catch the exception and call _reopen() + with pytest.raises(RuntimeError): + # second call after reopen will still raise, so we expect the error + wd.read_line() + + assert reopened["called"] is True + + +def test_watchdog_triggers_reopen_on_dead_link(monkeypatch): + mock = MockReader(lines=[""]) + wd = WatchdogSerialReader(mock, dead_threshold_seconds=0.0) + + reopened = {"called": False} + + def fake_reopen(): + reopened["called"] = True + + monkeypatch.setattr(wd, "_reopen", fake_reopen) + + wd.read_line() + + assert reopened["called"] is True