Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added modules/common/__init__.py
Empty file.
123 changes: 123 additions & 0 deletions modules/common/cbor_raw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Shared CBOR record reader for SleepyPod biometrics modules.

The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks,
so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual
record size. Since RAW file records are typically 17-5000 bytes, this causes
nearly every record to be skipped silently.

This module provides a manual parser for the outer {seq, data} CBOR wrapper
that reads byte-by-byte using f.read(), keeping f.tell() accurate.

Protocol contract: Pod firmware always emits the outer map with keys in the
order "seq" then "data". The seq value is encoded as a fixed-width uint32
(0x1a) by the embedded CBOR encoder, regardless of value. This parser
validates that contract and raises ValueError on deviation.

See: https://github.com/throwaway31265/free-sleep/pull/46
"""

import struct

# Reject implausibly large data payloads (corrupt length field protection)
_MAX_DATA_LENGTH = 1_000_000


def read_raw_record(f):
"""Parse one outer {seq, data} CBOR record from file object *f*.

Returns the raw inner data bytes, or ``None`` for empty placeholder
records (Pod firmware writes ``data=b''`` as sequence-number markers).

Raises:
EOFError: End of file or incomplete record (not yet fully written).
ValueError: Malformed CBOR structure.
"""
b = f.read(1)
if not b:
raise EOFError
if b[0] != 0xa2:
raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0])

# "seq" key — text(3) "seq"
# Pod firmware always emits keys in order: seq, then data.
seq_key = f.read(4)
if len(seq_key) < 4:
raise EOFError
if seq_key != b'\x63\x73\x65\x71':
raise ValueError('Expected seq key')

# seq value — Pod firmware uses fixed-width uint32 (0x1a), but we accept
# all valid CBOR unsigned integer encodings for forward compatibility.
hdr = f.read(1)
if not hdr:
raise EOFError
mt = hdr[0] >> 5
ai = hdr[0] & 0x1f
if mt != 0:
raise ValueError('seq must be unsigned int, got major type %d' % mt)
if ai <= 23:
pass # inline value, no additional bytes
elif ai == 24:
if len(f.read(1)) < 1:
raise EOFError
elif ai == 25:
if len(f.read(2)) < 2:
raise EOFError
elif ai == 26:
if len(f.read(4)) < 4:
raise EOFError
elif ai == 27:
if len(f.read(8)) < 8:
raise EOFError
else:
raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0])

# "data" key — text(4) "data"
data_key = f.read(5)
if len(data_key) < 5:
raise EOFError
if data_key != b'\x64\x64\x61\x74\x61':
raise ValueError('Expected data key')

# data value — must be a byte string (major type 2)
bs = f.read(1)
if not bs:
raise EOFError
if bs[0] >> 5 != 2:
raise ValueError('data must be a byte string, got major type %d' % (bs[0] >> 5))
ai = bs[0] & 0x1f
if ai <= 23:
length = ai
elif ai == 24:
lb = f.read(1)
if not lb:
raise EOFError
length = lb[0]
elif ai == 25:
lb = f.read(2)
if len(lb) < 2:
raise EOFError
length = struct.unpack('>H', lb)[0]
elif ai == 26:
lb = f.read(4)
if len(lb) < 4:
raise EOFError
length = struct.unpack('>I', lb)[0]
elif ai == 27:
lb = f.read(8)
if len(lb) < 8:
raise EOFError
length = struct.unpack('>Q', lb)[0]
else:
raise ValueError('Unsupported data length encoding: %d' % ai)

if length > _MAX_DATA_LENGTH:
raise ValueError('Implausibly large data field: %d bytes' % length)

data = f.read(length)
if len(data) < length:
raise EOFError
Comment on lines +42 to +120
if not data: # length==0: firmware placeholder record (sequence-number marker)
return None
return data
100 changes: 100 additions & 0 deletions modules/common/raw_follower.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Shared RAW file follower for SleepyPod biometrics modules.

Tails the newest .RAW file in a directory, yielding decoded CBOR records
as they are appended by the hardware daemon.
"""

import time
import logging
from pathlib import Path
from typing import Optional

import cbor2

from common.cbor_raw import read_raw_record

log = logging.getLogger(__name__)

MAX_CONSECUTIVE_FAILURES = 5


def _safe_mtime(p: Path) -> float:
"""Return mtime, or 0.0 if the file was deleted between glob and stat."""
try:
return p.stat().st_mtime
except FileNotFoundError:
return 0.0


class RawFileFollower:
"""Follow the newest .RAW file, yielding decoded CBOR records.

Args:
data_dir: Directory containing .RAW files.
shutdown_event: A threading.Event that signals shutdown.
poll_interval: Seconds to sleep when no new data is available.
"""

def __init__(self, data_dir: Path, shutdown_event, poll_interval: float = 0.01):
self.data_dir = data_dir
self._shutdown = shutdown_event
self._poll_interval = poll_interval
self._file = None
self._path = None
self._last_pos = 0
self._consecutive_failures = 0

def _find_latest(self) -> Optional[Path]:
candidates = [p for p in self.data_dir.glob("*.RAW") if _safe_mtime(p) > 0]
candidates.sort(key=_safe_mtime, reverse=True)
return candidates[0] if candidates else None

def read_records(self):
"""Yield decoded CBOR records as they arrive, sleeping between poll attempts."""
while not self._shutdown.is_set():
latest = self._find_latest()
if latest is None:
time.sleep(1)
continue

if latest != self._path:
log.info("Switched to RAW file: %s", latest.name)
if self._file:
self._file.close()
self._file = open(latest, "rb")
self._path = latest
self._last_pos = 0
self._consecutive_failures = 0

try:
data_bytes = read_raw_record(self._file)
if data_bytes is None:
self._last_pos = self._file.tell()
self._consecutive_failures = 0
continue # empty placeholder record
inner = cbor2.loads(data_bytes)
self._last_pos = self._file.tell()
self._consecutive_failures = 0
yield inner
except (ValueError, cbor2.CBORDecodeError, OSError) as e:
self._consecutive_failures += 1
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
log.warning("Skipping past corrupt data at offset %d after %d failures: %s",
self._last_pos, self._consecutive_failures, e)
self._last_pos += 1
self._consecutive_failures = 0
else:
log.debug("Error reading RAW record (attempt %d): %s",
self._consecutive_failures, e)
self._file.seek(self._last_pos)
time.sleep(0.1)
except EOFError:
self._file.seek(self._last_pos)
self._consecutive_failures = 0
time.sleep(self._poll_interval)

# Clean up file handle on shutdown
if self._file:
self._file.close()
self._file = None
73 changes: 17 additions & 56 deletions modules/piezo-processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from collections import deque
from typing import Optional

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

import cbor2
from common.raw_follower import RawFileFollower
import numpy as np
from scipy.signal import butter, filtfilt, welch
import heartpy as hp
Expand Down Expand Up @@ -98,17 +101,19 @@ def report_health(status: str, message: str) -> None:
"""Write module health to sleepypod.db system_health table."""
try:
conn = sqlite3.connect(str(SLEEPYPOD_DB), timeout=2.0)
with conn:
conn.execute(
"""INSERT INTO system_health (component, status, message, last_checked)
VALUES ('piezo-processor', ?, ?, ?)
ON CONFLICT(component) DO UPDATE SET
status=excluded.status,
message=excluded.message,
last_checked=excluded.last_checked""",
(status, message, int(time.time())),
)
conn.close()
try:
with conn:
conn.execute(
"""INSERT INTO system_health (component, status, message, last_checked)
VALUES ('piezo-processor', ?, ?, ?)
ON CONFLICT(component) DO UPDATE SET
status=excluded.status,
message=excluded.message,
last_checked=excluded.last_checked""",
(status, message, int(time.time())),
)
finally:
conn.close()
except Exception as e:
log.warning("Could not write health status: %s", e)

Expand Down Expand Up @@ -172,50 +177,6 @@ def compute_breathing_rate(samples: np.ndarray, fs: float = SAMPLE_RATE) -> Opti
log.debug("Breathing rate computation failed: %s", e)
return None

# ---------------------------------------------------------------------------
# RAW file follower
# ---------------------------------------------------------------------------

class RawFileFollower:
"""
Follows the newest .RAW file in RAW_DATA_DIR, tailing it as new CBOR
records are appended by the hardware daemon.
"""

def __init__(self, data_dir: Path):
self.data_dir = data_dir
self._file = None
self._path = None

def _find_latest(self) -> Optional[Path]:
candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True)
return candidates[0] if candidates else None

def read_records(self):
"""Yield decoded CBOR records as they arrive. Blocks between records."""
while not _shutdown.is_set():
latest = self._find_latest()
if latest is None:
time.sleep(1)
continue

if latest != self._path:
log.info("Switched to RAW file: %s", latest.name)
if self._file:
self._file.close()
self._file = open(latest, "rb")
self._path = latest

try:
record = cbor2.load(self._file)
inner = cbor2.loads(record["data"])
yield inner
except EOFError:
# No new data yet — poll
time.sleep(0.01)
except Exception as e:
log.warning("Error reading RAW record: %s", e)
time.sleep(1)

# ---------------------------------------------------------------------------
# Per-side processor
Expand Down Expand Up @@ -271,7 +232,7 @@ def main() -> None:
db_conn = open_biometrics_db()
left = SideProcessor("left", db_conn)
right = SideProcessor("right", db_conn)
follower = RawFileFollower(RAW_DATA_DIR)
follower = RawFileFollower(RAW_DATA_DIR, _shutdown, poll_interval=0.01)

report_health("healthy", "piezo-processor started")

Expand Down
Loading