Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions app/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ def load_config(
- Valid file → return SettingsNamespace
"""
path = Path(path)
print(">>> loading:", path)

if not path.exists():
return {}

try:
with path.open("rb") as f:
data: Any = tomllib.load(f)
print(">>> parsed:", data)

except Exception:
return {}
Expand Down
48 changes: 29 additions & 19 deletions app/ingestion/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class PushClient:
PushClient is the ingestion engine:
- receives parsed records via handle_record()
- writes them to SQLite
- pushes them immediately to the ingestion API
- pushes them immediately to the Beamwarden ingest API
- marks them pushed on success

Device identity headers (X-Device-Name, X-Device-Token) are added to every
push request so Beamwarden can authenticate the device.
Beamwarden identifies devices by serial_number (device_name) and
authenticates via a per-device bearer token (device_token).
"""

def __init__(
Expand All @@ -39,8 +39,9 @@ def __init__(
self.device_id = device_id
self.db_path = db_path

# Device identity for Beamwarden
# serial_number used to look up the Beamrider in Beamwarden
self.device_name = device_name or device_id
# bearer token for Beamwarden device authentication
self.device_token = device_token or ""

self._conn = sqlite3.connect(self.db_path, check_same_thread=False)
Expand Down Expand Up @@ -104,31 +105,40 @@ def _mark_pushed(self, row_id: int) -> None:

def _push_single(self, record: GeigerRecord) -> bool:
"""
Push a single GeigerRecord to the ingestion endpoint.
Returns True on success.
"""
Push a single GeigerRecord to the Beamwarden ingest endpoint.

headers = {
"X-Device-Name": self.device_name,
"X-Device-Token": self.device_token,
}
Beamwarden contract (POST /api/readings):
sensor_type — logical sensor name (e.g. "geiger")
payload — arbitrary JSON with the sensor reading
timestamp — device-side ISO8601 timestamp

Device identity is established entirely by the bearer token —
no serial number is needed in the request body.

Returns True on success (HTTP 201).
"""

if self.api_token:
headers: Dict[str, str] = {}
if self.device_token:
headers["Authorization"] = f"Bearer {self.device_token}"
elif self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"

payload = {
"counts_per_second": record.counts_per_second,
"counts_per_minute": record.counts_per_minute,
"microsieverts_per_hour": record.microsieverts_per_hour,
"mode": record.mode,
"device_id": record.device_id,
body = {
"sensor_type": "geiger",
"payload": {
"cps": record.counts_per_second,
"cpm": record.counts_per_minute,
"uSv_h": record.microsieverts_per_hour,
"mode": record.mode,
},
"timestamp": record.timestamp.isoformat(),
}

try:
resp = requests.post(
self.ingest_url,
json=payload,
json=body,
headers=headers,
timeout=5,
)
Expand Down
81 changes: 59 additions & 22 deletions app/ingestion/geiger_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import argparse
import logging
import sys
import tomllib
from typing import Any, Dict

from app.ingestion.api_client import PushClient
from app.ingestion.serial_reader import SerialReader
Expand All @@ -16,50 +18,85 @@ def build_parser() -> argparse.ArgumentParser:
description="Ingestion loop for MightyOhm Geiger counter readings.",
)

parser.add_argument("--device", required=True, type=str)
parser.add_argument("--baudrate", required=True, type=int)
parser.add_argument("--device-type", required=True, choices=["mightyohm"])
parser.add_argument("--db", required=True, type=str)
parser.add_argument("--api-url", required=True, type=str)
parser.add_argument("--config", required=False, type=str,
help="Path to TOML config file (overrides individual flags)")
parser.add_argument("--device", required=False, type=str)
parser.add_argument("--baudrate", required=False, type=int, default=9600)
parser.add_argument("--device-type", required=False, default="mightyohm",
choices=["mightyohm"])
parser.add_argument("--db", required=False, type=str)
parser.add_argument("--api-url", required=False, type=str)
parser.add_argument("--api-token", required=False, default="", type=str)
parser.add_argument("--device-id", required=True, type=str)
parser.add_argument("--device-id", required=False, type=str)
parser.add_argument("--device-name", required=False, type=str,
help="Beamwarden serial_number for this device")
parser.add_argument("--device-token", required=False, default="", type=str,
help="Beamwarden device bearer token")

return parser


def _load_toml(path: str) -> Dict[str, Any]:
with open(path, "rb") as f:
return tomllib.load(f)


def main() -> int:
args = build_parser().parse_args()

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)

if args.config:
cfg = _load_toml(args.config)
serial_device = cfg["serial"]["device"]
baudrate = int(cfg["serial"].get("baudrate", 9600))
db_path = cfg["storage"]["db_path"]
api_url = cfg["push"]["url"]
api_token = cfg["push"].get("api_token", "")
device_name = cfg["device"]["name"]
device_token = cfg["device"].get("token", "")
device_id = device_name
else:
if not args.device or not args.db or not args.api_url or not args.device_id:
logging.error("--config or (--device --db --api-url --device-id) required")
return 1
serial_device = args.device
baudrate = args.baudrate
db_path = args.db
api_url = args.api_url
api_token = args.api_token
device_name = args.device_name or args.device_id
device_token = args.device_token
device_id = args.device_id

start_health_server()

logging.info("Starting ingestion agent")
logging.info(f"Device: {args.device}")
logging.info(f"Baudrate: {args.baudrate}")
logging.info(f"Device type: {args.device_type}")
logging.info(f"DB path: {args.db}")
logging.info(f"API URL: {args.api_url}")
logging.info(
"API token: <empty>" if args.api_token == "" else "API token: <provided>"
)
logging.info(f"Device ID: {args.device_id}")
logging.info(f"Device: {serial_device}")
logging.info(f"Baudrate: {baudrate}")
logging.info(f"DB path: {db_path}")
logging.info(f"API URL: {api_url}")
logging.info("API token: <empty>" if not api_token else "API token: <provided>")
logging.info(f"Device name: {device_name}")
logging.info("Device token: <empty>" if not device_token else "Device token: <provided>")

base_reader = SerialReader(
device=args.device,
baudrate=args.baudrate,
device=serial_device,
baudrate=baudrate,
)

reader = WatchdogSerialReader(base_reader)


client = PushClient(
api_url=args.api_url,
api_token=args.api_token,
device_id=args.device_id,
db_path=args.db,
api_url=api_url,
api_token=api_token,
device_id=device_id,
db_path=db_path,
device_name=device_name,
device_token=device_token,
)

reader.set_handler(client.handle_record)
Expand Down
33 changes: 0 additions & 33 deletions app/logexp_client.py

This file was deleted.

29 changes: 17 additions & 12 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class GeigerRecord:
Canonical local representation of a single Geiger reading in Pi-log.

This is the shape we store in SQLite and use as the source for pushes
to LogExp. It keeps the raw MightyOhm CSV line for debugging and
diagnostics, but the wire contract with LogExp uses only the canonical
ingestion fields (no raw or local timestamp).
to Beamwarden. It keeps the raw MightyOhm CSV line for debugging and
diagnostics, but the wire contract with Beamwarden wraps the sensor
fields inside a payload envelope (see to_ingest_payload()).

Fields:
id: Optional database primary key (None before insert).
Expand All @@ -24,9 +24,9 @@ class GeigerRecord:
counts_per_minute: CPM value parsed from the CSV.
microsieverts_per_hour: uSv/hr value parsed from the CSV.
mode: One of "SLOW", "FAST", or "INST".
device_id: Logical identifier for this Pi-log node (e.g. "pi-log").
device_id: Logical identifier for this Pi-log node (e.g. "beamrider-0001").
timestamp: UTC timestamp recorded locally when the reading was created.
pushed: Whether this reading has been successfully pushed to LogExp.
pushed: Whether this reading has been successfully pushed to Beamwarden.
"""

id: Optional[int]
Expand Down Expand Up @@ -73,15 +73,20 @@ def from_parsed(
)

# ------------------------------------------------------------
# Payload for LogExp ingestion API
# Payload for Beamwarden ingest API
# Beamwarden contract: POST /api/readings
# Device identity is established by the bearer token, not the body.
# ------------------------------------------------------------
def to_logexp_payload(self) -> dict[str, Any]:
def to_ingest_payload(self) -> dict[str, Any]:
return {
"counts_per_second": self.counts_per_second,
"counts_per_minute": self.counts_per_minute,
"microsieverts_per_hour": self.microsieverts_per_hour,
"mode": self.mode.upper(),
"device_id": self.device_id,
"sensor_type": "geiger",
"payload": {
"cps": self.counts_per_second,
"cpm": self.counts_per_minute,
"uSv_h": self.microsieverts_per_hour,
"mode": self.mode.upper(),
},
"timestamp": self.timestamp.isoformat(),
}

# ------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, raw: Optional[Dict[str, Any]]) -> None:
self.push = Section(raw.get("push", {}))
self.ingestion = Section(raw.get("ingestion", {}))
self.telemetry = Section(raw.get("telemetry", {}))
self.device = Section(raw.get("device", {}))

@classmethod
def from_dict(cls, raw: Dict[str, Any]) -> "Settings":
Expand Down
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os # noqa: F401
import sqlite3
from datetime import datetime, timezone
import pytest
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
Expand Down Expand Up @@ -90,12 +91,14 @@ def push_client(tmp_path):
def geiger_record():
def _factory(**overrides):
base = {
"id": None,
"raw": "RAW",
"counts_per_second": 10,
"counts_per_minute": 600,
"microsieverts_per_hour": 0.10,
"mode": "FAST",
"device_id": "pi-log",
"device_id": "beamrider-0001",
"timestamp": datetime.now(timezone.utc),
}
base.update(overrides)
return GeigerRecord(**base)
Expand Down
Loading