Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions app/ingestion/geiger_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from app.ingestion.api_client import PushClient
from app.ingestion.serial_reader import SerialReader

from app.ingestion.watchdog import WatchdogSerialReader

def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -44,11 +44,14 @@ def main() -> int:
)
logging.info(f"Device ID: {args.device_id}")

reader = SerialReader(
base_reader = SerialReader(
device=args.device,
baudrate=args.baudrate,
)

reader = WatchdogSerialReader(base_reader)


client = PushClient(
api_url=args.api_url,
api_token=args.api_token,
Expand Down
4 changes: 2 additions & 2 deletions app/ingestion/serial_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ def run(self) -> None:
except (KeyboardInterrupt, StopIteration):
break

except Exception as e:
logging.error(f"Error in serial loop: {e}")
except Exception as exc:
logging.error(f"Error in serial loop: {exc}")
time.sleep(0.1)
120 changes: 120 additions & 0 deletions app/ingestion/watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# filename: app/ingestion/watchdog.py

import time
import logging
from typing import Any, Optional, Callable, Dict, Protocol

from app.ingestion.csv_parser import parse_geiger_csv

log = logging.getLogger(__name__)


class SerialReaderProtocol(Protocol):
ser: Any

def set_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None:
...

def read_line(self) -> str:
...


class WatchdogSerialReader:
"""
Drop-in wrapper around a SerialReader-like object that adds:
- dead-read detection
- FTDI disappearance detection
- automatic port reopen
"""

def __init__(
self,
reader: SerialReaderProtocol,
dead_threshold_seconds: float = 5.0,
reopen_sleep_seconds: float = 2.0,
) -> None:
self._reader: SerialReaderProtocol = reader
self._dead_threshold = dead_threshold_seconds
self._reopen_sleep = reopen_sleep_seconds
self._last_frame_ts = time.time()
self._handler: Optional[Callable[[Dict[str, Any]], None]] = None

# ------------------------------------------------------------
# Public API: must match SerialReader
# ------------------------------------------------------------

def set_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None:
self._handler = handler
self._reader.set_handler(handler)

def run(self) -> None:
"""
Same loop as SerialReader.run(), but using watchdog-aware read_line().
"""
while True:
try:
raw = self.read_line()
log.info(f"RAW: {raw!r}")

parsed = parse_geiger_csv(raw)
log.info(f"PARSED: {parsed}")

if parsed is not None and self._handler is not None:
self._handler(parsed)

except (KeyboardInterrupt, StopIteration):
break

except Exception as exc:
log.error(f"Error in watchdog serial loop: {exc}")
time.sleep(0.1)

# ------------------------------------------------------------
# Watchdog logic
# ------------------------------------------------------------

def read_line(self) -> str:
now = time.time()

# Dead link detection
if now - self._last_frame_ts > self._dead_threshold:
log.warning(
"watchdog_dead_link_detected",
extra={"last_frame_age": now - self._last_frame_ts},
)
self._reopen()

try:
line = self._reader.read_line()
except Exception as exc:
log.error("watchdog_read_exception", extra={"error": repr(exc)})
self._reopen()
line = self._reader.read_line()

if line:
self._last_frame_ts = time.time()

return line

def _reopen(self) -> None:
log.warning("watchdog_reopen_start")

try:
# Best-effort close with proper type narrowing
ser = getattr(self._reader, "ser", None)
if ser is not None:
try:
ser.close()
except Exception as exc:
log.error("watchdog_close_failed", extra={"error": repr(exc)})

# Force lazy reopen
self._reader.ser = None

time.sleep(self._reopen_sleep)

log.warning("watchdog_reopen_success")

except Exception as exc:
log.error("watchdog_reopen_failed", extra={"error": repr(exc)})
raise
95 changes: 95 additions & 0 deletions tests/unit/test_watchdog_serial_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# filename: tests/unit/test_watchdog_serial_reader.py

import time
import pytest

from app.ingestion.watchdog import WatchdogSerialReader


class MockSerial:
def __init__(self):
self.closed = False

def close(self):
self.closed = True


class MockReader:
def __init__(self, lines=None, raise_on_call=False):
self.lines = lines or []
self.raise_on_call = raise_on_call
self.calls = 0
self.handler = None
self.ser = MockSerial()

def set_handler(self, handler):
self.handler = handler

def read_line(self):
self.calls += 1
if self.raise_on_call:
raise RuntimeError("boom")
if self.lines:
return self.lines.pop(0)
return ""



def test_watchdog_proxies_set_handler():
mock = MockReader()
wd = WatchdogSerialReader(mock)

def handler(_):
pass

wd.set_handler(handler)

assert mock.handler is handler


def test_watchdog_updates_last_frame_timestamp_on_data():
mock = MockReader(lines=["abc"])
wd = WatchdogSerialReader(mock)

before = wd._last_frame_ts
time.sleep(0.01)

line = wd.read_line()

assert line == "abc"
assert wd._last_frame_ts > before


def test_watchdog_triggers_reopen_on_exception(monkeypatch):
mock = MockReader(raise_on_call=True)
wd = WatchdogSerialReader(mock)

reopened = {"called": False}

def fake_reopen():
reopened["called"] = True

monkeypatch.setattr(wd, "_reopen", fake_reopen)

# read_line should catch the exception and call _reopen()
with pytest.raises(RuntimeError):
# second call after reopen will still raise, so we expect the error
wd.read_line()

assert reopened["called"] is True


def test_watchdog_triggers_reopen_on_dead_link(monkeypatch):
mock = MockReader(lines=[""])
wd = WatchdogSerialReader(mock, dead_threshold_seconds=0.0)

reopened = {"called": False}

def fake_reopen():
reopened["called"] = True

monkeypatch.setattr(wd, "_reopen", fake_reopen)

wd.read_line()

assert reopened["called"] is True