From 98610ed838869db0772e81ab6734c14affd65bc8 Mon Sep 17 00:00:00 2001 From: Jeb Baugh Date: Sat, 7 Mar 2026 16:51:30 -0600 Subject: [PATCH 1/2] Align pi-log with Beamwarden ingest contract and Quasar beamrider-agent role - Fix PushClient._push_single() to send Beamwarden envelope: beamrider_serial / sensor_name / recorded_at / payload - Add --config support to geiger_reader.py matching the Quasar beamrider-agent service template; add --device-name / --device-token CLI args and wire them into PushClient - Add device section to Settings (name, token) - Rename GeigerRecord.to_logexp_payload() -> to_ingest_payload() with correct Beamwarden shape - Remove debug print() statements from config_loader.py - Delete orphaned logexp_client.py - Fix geiger_record fixture: add id=None and timestamp defaults Co-Authored-By: Claude Sonnet 4.6 --- app/config_loader.py | 2 - app/ingestion/api_client.py | 49 +++++++++++--------- app/ingestion/geiger_reader.py | 81 +++++++++++++++++++++++++--------- app/logexp_client.py | 33 -------------- app/models.py | 29 +++++++----- app/settings.py | 1 + tests/conftest.py | 5 ++- 7 files changed, 110 insertions(+), 90 deletions(-) delete mode 100755 app/logexp_client.py diff --git a/app/config_loader.py b/app/config_loader.py index 2d4c9a0..7a2e262 100644 --- a/app/config_loader.py +++ b/app/config_loader.py @@ -41,7 +41,6 @@ def load_config( - Valid file → return SettingsNamespace """ path = Path(path) - print(">>> loading:", path) if not path.exists(): return {} @@ -49,7 +48,6 @@ def load_config( try: with path.open("rb") as f: data: Any = tomllib.load(f) - print(">>> parsed:", data) except Exception: return {} diff --git a/app/ingestion/api_client.py b/app/ingestion/api_client.py index b25e67f..cbf5dfb 100755 --- a/app/ingestion/api_client.py +++ b/app/ingestion/api_client.py @@ -15,11 +15,11 @@ class PushClient: PushClient is the ingestion engine: - receives parsed records via handle_record() - writes them to SQLite - - pushes them immediately to the ingestion API + - pushes them immediately to the Beamwarden ingest API - marks them pushed on success - Device identity headers (X-Device-Name, X-Device-Token) are added to every - push request so Beamwarden can authenticate the device. + Beamwarden identifies devices by serial_number (device_name) and + authenticates via a per-device bearer token (device_token). """ def __init__( @@ -39,8 +39,9 @@ def __init__( self.device_id = device_id self.db_path = db_path - # Device identity for Beamwarden + # serial_number used to look up the Beamrider in Beamwarden self.device_name = device_name or device_id + # bearer token for Beamwarden device authentication self.device_token = device_token or "" self._conn = sqlite3.connect(self.db_path, check_same_thread=False) @@ -104,31 +105,39 @@ def _mark_pushed(self, row_id: int) -> None: def _push_single(self, record: GeigerRecord) -> bool: """ - Push a single GeigerRecord to the ingestion endpoint. - Returns True on success. - """ + Push a single GeigerRecord to the Beamwarden ingest endpoint. - headers = { - "X-Device-Name": self.device_name, - "X-Device-Token": self.device_token, - } + Beamwarden contract (POST /api/readings/ingest/): + beamrider_serial — device serial_number registered in Beamwarden + sensor_name — logical sensor name (e.g. "geiger") + recorded_at — device-side ISO8601 timestamp + payload — arbitrary JSON with the sensor reading + + Returns True on success (HTTP 201). + """ - if self.api_token: + headers: Dict[str, str] = {} + if self.device_token: + headers["Authorization"] = f"Bearer {self.device_token}" + elif self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" - payload = { - "counts_per_second": record.counts_per_second, - "counts_per_minute": record.counts_per_minute, - "microsieverts_per_hour": record.microsieverts_per_hour, - "mode": record.mode, - "device_id": record.device_id, - "timestamp": record.timestamp.isoformat(), + body = { + "beamrider_serial": self.device_name, + "sensor_name": "geiger", + "recorded_at": record.timestamp.isoformat(), + "payload": { + "cps": record.counts_per_second, + "cpm": record.counts_per_minute, + "uSv_h": record.microsieverts_per_hour, + "mode": record.mode, + }, } try: resp = requests.post( self.ingest_url, - json=payload, + json=body, headers=headers, timeout=5, ) diff --git a/app/ingestion/geiger_reader.py b/app/ingestion/geiger_reader.py index 69385d2..e132d0a 100755 --- a/app/ingestion/geiger_reader.py +++ b/app/ingestion/geiger_reader.py @@ -3,6 +3,8 @@ import argparse import logging import sys +import tomllib +from typing import Any, Dict from app.ingestion.api_client import PushClient from app.ingestion.serial_reader import SerialReader @@ -16,17 +18,29 @@ def build_parser() -> argparse.ArgumentParser: description="Ingestion loop for MightyOhm Geiger counter readings.", ) - parser.add_argument("--device", required=True, type=str) - parser.add_argument("--baudrate", required=True, type=int) - parser.add_argument("--device-type", required=True, choices=["mightyohm"]) - parser.add_argument("--db", required=True, type=str) - parser.add_argument("--api-url", required=True, type=str) + parser.add_argument("--config", required=False, type=str, + help="Path to TOML config file (overrides individual flags)") + parser.add_argument("--device", required=False, type=str) + parser.add_argument("--baudrate", required=False, type=int, default=9600) + parser.add_argument("--device-type", required=False, default="mightyohm", + choices=["mightyohm"]) + parser.add_argument("--db", required=False, type=str) + parser.add_argument("--api-url", required=False, type=str) parser.add_argument("--api-token", required=False, default="", type=str) - parser.add_argument("--device-id", required=True, type=str) + parser.add_argument("--device-id", required=False, type=str) + parser.add_argument("--device-name", required=False, type=str, + help="Beamwarden serial_number for this device") + parser.add_argument("--device-token", required=False, default="", type=str, + help="Beamwarden device bearer token") return parser +def _load_toml(path: str) -> Dict[str, Any]: + with open(path, "rb") as f: + return tomllib.load(f) + + def main() -> int: args = build_parser().parse_args() @@ -34,32 +48,55 @@ def main() -> int: level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) + + if args.config: + cfg = _load_toml(args.config) + serial_device = cfg["serial"]["device"] + baudrate = int(cfg["serial"].get("baudrate", 9600)) + db_path = cfg["storage"]["db_path"] + api_url = cfg["push"]["url"] + api_token = cfg["push"].get("api_token", "") + device_name = cfg["device"]["name"] + device_token = cfg["device"].get("token", "") + device_id = device_name + else: + if not args.device or not args.db or not args.api_url or not args.device_id: + logging.error("--config or (--device --db --api-url --device-id) required") + return 1 + serial_device = args.device + baudrate = args.baudrate + db_path = args.db + api_url = args.api_url + api_token = args.api_token + device_name = args.device_name or args.device_id + device_token = args.device_token + device_id = args.device_id + start_health_server() logging.info("Starting ingestion agent") - logging.info(f"Device: {args.device}") - logging.info(f"Baudrate: {args.baudrate}") - logging.info(f"Device type: {args.device_type}") - logging.info(f"DB path: {args.db}") - logging.info(f"API URL: {args.api_url}") - logging.info( - "API token: " if args.api_token == "" else "API token: " - ) - logging.info(f"Device ID: {args.device_id}") + logging.info(f"Device: {serial_device}") + logging.info(f"Baudrate: {baudrate}") + logging.info(f"DB path: {db_path}") + logging.info(f"API URL: {api_url}") + logging.info("API token: " if not api_token else "API token: ") + logging.info(f"Device name: {device_name}") + logging.info("Device token: " if not device_token else "Device token: ") base_reader = SerialReader( - device=args.device, - baudrate=args.baudrate, + device=serial_device, + baudrate=baudrate, ) reader = WatchdogSerialReader(base_reader) - client = PushClient( - api_url=args.api_url, - api_token=args.api_token, - device_id=args.device_id, - db_path=args.db, + api_url=api_url, + api_token=api_token, + device_id=device_id, + db_path=db_path, + device_name=device_name, + device_token=device_token, ) reader.set_handler(client.handle_record) diff --git a/app/logexp_client.py b/app/logexp_client.py deleted file mode 100755 index d7522f8..0000000 --- a/app/logexp_client.py +++ /dev/null @@ -1,33 +0,0 @@ -import requests -import logging - - -class LogExpClient: - """ - Minimal client for pushing readings to LogExp. - """ - - def __init__(self, base_url: str, token: str): - self.base_url = base_url.rstrip("/") - self.token = token - self.log = logging.getLogger(__name__) - - def push(self, record_id: int, record: dict) -> bool: - """ - Push a reading to LogExp. - - Returns True on success, False on failure. - """ - try: - resp = requests.post( - f"{self.base_url}/api/readings", - json={"id": record_id, **record}, - headers={"X-API-Key": self.token}, - timeout=5, - ) - resp.raise_for_status() - return True - - except Exception as exc: - self.log.error(f"LogExp push failed: {exc}") - return False diff --git a/app/models.py b/app/models.py index a840e83..ada21c6 100755 --- a/app/models.py +++ b/app/models.py @@ -13,9 +13,9 @@ class GeigerRecord: Canonical local representation of a single Geiger reading in Pi-log. This is the shape we store in SQLite and use as the source for pushes - to LogExp. It keeps the raw MightyOhm CSV line for debugging and - diagnostics, but the wire contract with LogExp uses only the canonical - ingestion fields (no raw or local timestamp). + to Beamwarden. It keeps the raw MightyOhm CSV line for debugging and + diagnostics, but the wire contract with Beamwarden wraps the sensor + fields inside a payload envelope (see to_ingest_payload()). Fields: id: Optional database primary key (None before insert). @@ -24,9 +24,9 @@ class GeigerRecord: counts_per_minute: CPM value parsed from the CSV. microsieverts_per_hour: uSv/hr value parsed from the CSV. mode: One of "SLOW", "FAST", or "INST". - device_id: Logical identifier for this Pi-log node (e.g. "pi-log"). + device_id: Logical identifier for this Pi-log node (e.g. "beamrider-0001"). timestamp: UTC timestamp recorded locally when the reading was created. - pushed: Whether this reading has been successfully pushed to LogExp. + pushed: Whether this reading has been successfully pushed to Beamwarden. """ id: Optional[int] @@ -73,15 +73,20 @@ def from_parsed( ) # ------------------------------------------------------------ - # Payload for LogExp ingestion API + # Payload for Beamwarden ingest API + # Beamwarden contract: POST /api/readings/ingest/ # ------------------------------------------------------------ - def to_logexp_payload(self) -> dict[str, Any]: + def to_ingest_payload(self, beamrider_serial: str) -> dict[str, Any]: return { - "counts_per_second": self.counts_per_second, - "counts_per_minute": self.counts_per_minute, - "microsieverts_per_hour": self.microsieverts_per_hour, - "mode": self.mode.upper(), - "device_id": self.device_id, + "beamrider_serial": beamrider_serial, + "sensor_name": "geiger", + "recorded_at": self.timestamp.isoformat(), + "payload": { + "cps": self.counts_per_second, + "cpm": self.counts_per_minute, + "uSv_h": self.microsieverts_per_hour, + "mode": self.mode.upper(), + }, } # ------------------------------------------------------------ diff --git a/app/settings.py b/app/settings.py index 9d90717..016acfc 100644 --- a/app/settings.py +++ b/app/settings.py @@ -39,6 +39,7 @@ def __init__(self, raw: Optional[Dict[str, Any]]) -> None: self.push = Section(raw.get("push", {})) self.ingestion = Section(raw.get("ingestion", {})) self.telemetry = Section(raw.get("telemetry", {})) + self.device = Section(raw.get("device", {})) @classmethod def from_dict(cls, raw: Dict[str, Any]) -> "Settings": diff --git a/tests/conftest.py b/tests/conftest.py index d966408..c99ec0b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,7 @@ import os # noqa: F401 import sqlite3 +from datetime import datetime, timezone import pytest from unittest.mock import MagicMock, patch from fastapi.testclient import TestClient @@ -90,12 +91,14 @@ def push_client(tmp_path): def geiger_record(): def _factory(**overrides): base = { + "id": None, "raw": "RAW", "counts_per_second": 10, "counts_per_minute": 600, "microsieverts_per_hour": 0.10, "mode": "FAST", - "device_id": "pi-log", + "device_id": "beamrider-0001", + "timestamp": datetime.now(timezone.utc), } base.update(overrides) return GeigerRecord(**base) From 8ee0a62885aae66cb0d60f46b9022bf3fcbde627 Mon Sep 17 00:00:00 2001 From: Jeb Baugh Date: Sun, 5 Apr 2026 16:23:11 -0500 Subject: [PATCH 2/2] Fix ingest payload to match PiLogIngestView contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PiLogIngestView (POST /api/readings) expects sensor_type + payload + timestamp — device identity comes from the bearer token alone, no beamrider_serial or sensor_name in the body. Co-Authored-By: Claude Sonnet 4.6 --- app/ingestion/api_client.py | 17 +++++++++-------- app/models.py | 10 +++++----- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/app/ingestion/api_client.py b/app/ingestion/api_client.py index cbf5dfb..796b61c 100755 --- a/app/ingestion/api_client.py +++ b/app/ingestion/api_client.py @@ -107,11 +107,13 @@ def _push_single(self, record: GeigerRecord) -> bool: """ Push a single GeigerRecord to the Beamwarden ingest endpoint. - Beamwarden contract (POST /api/readings/ingest/): - beamrider_serial — device serial_number registered in Beamwarden - sensor_name — logical sensor name (e.g. "geiger") - recorded_at — device-side ISO8601 timestamp - payload — arbitrary JSON with the sensor reading + Beamwarden contract (POST /api/readings): + sensor_type — logical sensor name (e.g. "geiger") + payload — arbitrary JSON with the sensor reading + timestamp — device-side ISO8601 timestamp + + Device identity is established entirely by the bearer token — + no serial number is needed in the request body. Returns True on success (HTTP 201). """ @@ -123,15 +125,14 @@ def _push_single(self, record: GeigerRecord) -> bool: headers["Authorization"] = f"Bearer {self.api_token}" body = { - "beamrider_serial": self.device_name, - "sensor_name": "geiger", - "recorded_at": record.timestamp.isoformat(), + "sensor_type": "geiger", "payload": { "cps": record.counts_per_second, "cpm": record.counts_per_minute, "uSv_h": record.microsieverts_per_hour, "mode": record.mode, }, + "timestamp": record.timestamp.isoformat(), } try: diff --git a/app/models.py b/app/models.py index ada21c6..f7fc3df 100755 --- a/app/models.py +++ b/app/models.py @@ -74,19 +74,19 @@ def from_parsed( # ------------------------------------------------------------ # Payload for Beamwarden ingest API - # Beamwarden contract: POST /api/readings/ingest/ + # Beamwarden contract: POST /api/readings + # Device identity is established by the bearer token, not the body. # ------------------------------------------------------------ - def to_ingest_payload(self, beamrider_serial: str) -> dict[str, Any]: + def to_ingest_payload(self) -> dict[str, Any]: return { - "beamrider_serial": beamrider_serial, - "sensor_name": "geiger", - "recorded_at": self.timestamp.isoformat(), + "sensor_type": "geiger", "payload": { "cps": self.counts_per_second, "cpm": self.counts_per_minute, "uSv_h": self.microsieverts_per_hour, "mode": self.mode.upper(), }, + "timestamp": self.timestamp.isoformat(), } # ------------------------------------------------------------