From 615b6efa35732b988df23b79906976cfee4b6971 Mon Sep 17 00:00:00 2001 From: Jonathan Ng Date: Sat, 14 Mar 2026 16:02:30 -0700 Subject: [PATCH 1/3] fix(biometrics): bypass cbor2 C extension that silently skips records The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks, advancing f.tell() by 4096 regardless of actual record size. Since RAW records are 17-5000 bytes, cbor2.load(f) silently skips most records. On Pod 5, piezo-dual records are ~2700 bytes so nearly every other record was lost, severely degrading vitals accuracy. Additionally, Pod 5 firmware writes empty placeholder records (data=b'') as sequence markers. cbor2.loads(b'') raises CBORDecodeEOF (a subclass of EOFError), which the read loop caught as end-of-file, terminating reads mid-file with valid data remaining. Fix: Replace cbor2.load(f) with _read_raw_record(f) that manually parses the outer {seq, data} CBOR wrapper byte-by-byte, keeping f.tell() accurate. Empty placeholders return None and are skipped. On read errors, seek back to last known good position instead of breaking the loop. Applied to both piezo-processor and sleep-detector modules. Ref: throwaway31265/free-sleep#46 --- modules/piezo-processor/main.py | 85 ++++++++++++++++++++++++++++++++- modules/sleep-detector/main.py | 85 ++++++++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 4 deletions(-) diff --git a/modules/piezo-processor/main.py b/modules/piezo-processor/main.py index fc6e9fc5..ca1c4bde 100644 --- a/modules/piezo-processor/main.py +++ b/modules/piezo-processor/main.py @@ -27,6 +27,8 @@ from collections import deque from typing import Optional +import struct + import cbor2 import numpy as np from scipy.signal import butter, filtfilt, welch @@ -172,6 +174,78 @@ def compute_breathing_rate(samples: np.ndarray, fs: float = SAMPLE_RATE) -> Opti log.debug("Breathing rate computation failed: %s", e) return None +# --------------------------------------------------------------------------- +# CBOR record reader +# --------------------------------------------------------------------------- + +def _read_raw_record(f): + """ + Manually parse one outer {seq, data} CBOR record using f.read(). + + The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks, + so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual + record size. Since RAW file records are typically 17-5000 bytes, this causes + nearly every record to be skipped silently. + + This function parses the outer {seq: uint, data: bytes} wrapper byte-by-byte + using f.read(), keeping f.tell() accurate after each record. + + Returns the raw inner data bytes, or None for empty placeholder records + (which the Pod firmware writes as sequence number markers with data=b''). + Raises EOFError at end of file, ValueError on malformed data. + """ + b = f.read(1) + if not b: + raise EOFError + if b[0] != 0xa2: + raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0]) + if f.read(4) != b'\x63\x73\x65\x71': + raise ValueError('Expected seq key') + hdr = f.read(1) + if not hdr: + raise EOFError + if hdr[0] == 0x1a: + seq_bytes = f.read(4) + if len(seq_bytes) < 4: + raise EOFError + elif hdr[0] == 0x1b: + seq_bytes = f.read(8) + if len(seq_bytes) < 8: + raise EOFError + else: + raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0]) + if f.read(5) != b'\x64\x64\x61\x74\x61': + raise ValueError('Expected data key') + bs = f.read(1) + if not bs: + raise EOFError + ai = bs[0] & 0x1f + if ai <= 23: + length = ai + elif ai == 24: + lb = f.read(1) + if not lb: + raise EOFError + length = lb[0] + elif ai == 25: + lb = f.read(2) + if len(lb) < 2: + raise EOFError + length = struct.unpack('>H', lb)[0] + elif ai == 26: + lb = f.read(4) + if len(lb) < 4: + raise EOFError + length = struct.unpack('>I', lb)[0] + else: + raise ValueError('Unsupported length encoding: %d' % ai) + data = f.read(length) + if len(data) < length: + raise EOFError + if not data: + return None # empty placeholder record, caller should skip + return data + # --------------------------------------------------------------------------- # RAW file follower # --------------------------------------------------------------------------- @@ -186,6 +260,7 @@ def __init__(self, data_dir: Path): self.data_dir = data_dir self._file = None self._path = None + self._last_pos = 0 def _find_latest(self) -> Optional[Path]: candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True) @@ -205,16 +280,22 @@ def read_records(self): self._file.close() self._file = open(latest, "rb") self._path = latest + self._last_pos = 0 try: - record = cbor2.load(self._file) - inner = cbor2.loads(record["data"]) + data_bytes = _read_raw_record(self._file) + if data_bytes is None: + self._last_pos = self._file.tell() + continue # empty placeholder record + inner = cbor2.loads(data_bytes) + self._last_pos = self._file.tell() yield inner except EOFError: # No new data yet — poll time.sleep(0.01) except Exception as e: log.warning("Error reading RAW record: %s", e) + self._file.seek(self._last_pos) time.sleep(1) # --------------------------------------------------------------------------- diff --git a/modules/sleep-detector/main.py b/modules/sleep-detector/main.py index f4abbf19..67a49b13 100644 --- a/modules/sleep-detector/main.py +++ b/modules/sleep-detector/main.py @@ -29,6 +29,8 @@ from dataclasses import dataclass, field from typing import Optional +import struct + import cbor2 import numpy as np @@ -255,6 +257,78 @@ def _flush_movement(self, ts: float) -> None: self._movement_buf = [] self._last_movement_write = ts +# --------------------------------------------------------------------------- +# CBOR record reader +# --------------------------------------------------------------------------- + +def _read_raw_record(f): + """ + Manually parse one outer {seq, data} CBOR record using f.read(). + + The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks, + so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual + record size. Since RAW file records are typically 17-5000 bytes, this causes + nearly every record to be skipped silently. + + This function parses the outer {seq: uint, data: bytes} wrapper byte-by-byte + using f.read(), keeping f.tell() accurate after each record. + + Returns the raw inner data bytes, or None for empty placeholder records + (which the Pod firmware writes as sequence number markers with data=b''). + Raises EOFError at end of file, ValueError on malformed data. + """ + b = f.read(1) + if not b: + raise EOFError + if b[0] != 0xa2: + raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0]) + if f.read(4) != b'\x63\x73\x65\x71': + raise ValueError('Expected seq key') + hdr = f.read(1) + if not hdr: + raise EOFError + if hdr[0] == 0x1a: + seq_bytes = f.read(4) + if len(seq_bytes) < 4: + raise EOFError + elif hdr[0] == 0x1b: + seq_bytes = f.read(8) + if len(seq_bytes) < 8: + raise EOFError + else: + raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0]) + if f.read(5) != b'\x64\x64\x61\x74\x61': + raise ValueError('Expected data key') + bs = f.read(1) + if not bs: + raise EOFError + ai = bs[0] & 0x1f + if ai <= 23: + length = ai + elif ai == 24: + lb = f.read(1) + if not lb: + raise EOFError + length = lb[0] + elif ai == 25: + lb = f.read(2) + if len(lb) < 2: + raise EOFError + length = struct.unpack('>H', lb)[0] + elif ai == 26: + lb = f.read(4) + if len(lb) < 4: + raise EOFError + length = struct.unpack('>I', lb)[0] + else: + raise ValueError('Unsupported length encoding: %d' % ai) + data = f.read(length) + if len(data) < length: + raise EOFError + if not data: + return None # empty placeholder record, caller should skip + return data + # --------------------------------------------------------------------------- # RAW file follower (same pattern as piezo-processor) # --------------------------------------------------------------------------- @@ -264,6 +338,7 @@ def __init__(self, data_dir: Path): self.data_dir = data_dir self._file = None self._path = None + self._last_pos = 0 def _find_latest(self): candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True) @@ -282,15 +357,21 @@ def read_records(self): self._file.close() self._file = open(latest, "rb") self._path = latest + self._last_pos = 0 try: - record = cbor2.load(self._file) - inner = cbor2.loads(record["data"]) + data_bytes = _read_raw_record(self._file) + if data_bytes is None: + self._last_pos = self._file.tell() + continue # empty placeholder record + inner = cbor2.loads(data_bytes) + self._last_pos = self._file.tell() yield inner except EOFError: time.sleep(0.5) except Exception as e: log.warning("Error reading RAW record: %s", e) + self._file.seek(self._last_pos) time.sleep(1) # Clean up file handle on shutdown From 1ee7edba084f0eb48984124428454c69c5f91a46 Mon Sep 17 00:00:00 2001 From: Jonathan Ng Date: Sat, 14 Mar 2026 16:19:29 -0700 Subject: [PATCH 2/3] =?UTF-8?q?fix(biometrics):=20address=20review=20feedb?= =?UTF-8?q?ack=20=E2=80=94=20shared=20parser,=20cleanup,=20recovery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _read_raw_record into modules/common/cbor_raw.py to eliminate duplication between piezo-processor and sleep-detector - Add file handle cleanup on shutdown to piezo-processor (was already present in sleep-detector) - Add consecutive-failure counter to both modules: after 5 failures at the same file position, skip forward 1 byte to resync past corrupt data instead of retrying forever - Narrow exception handler from bare `except Exception` to `except (ValueError, cbor2.CBORDecodeError, OSError)` so only parsing/IO errors are retried Original CBOR fix ported from throwaway31265/free-sleep#46 by @seanpasino — thank you! Co-Authored-By: seanpasino --- modules/common/cbor_raw.py | 87 +++++++++++++++++++++++++++ modules/piezo-processor/main.py | 102 ++++++++------------------------ modules/sleep-detector/main.py | 97 +++++++----------------------- 3 files changed, 132 insertions(+), 154 deletions(-) create mode 100644 modules/common/cbor_raw.py diff --git a/modules/common/cbor_raw.py b/modules/common/cbor_raw.py new file mode 100644 index 00000000..6fb4a8d8 --- /dev/null +++ b/modules/common/cbor_raw.py @@ -0,0 +1,87 @@ +""" +Shared CBOR record reader for SleepyPod biometrics modules. + +The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks, +so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual +record size. Since RAW file records are typically 17-5000 bytes, this causes +nearly every record to be skipped silently. + +This module provides a manual parser for the outer {seq, data} CBOR wrapper +that reads byte-by-byte using f.read(), keeping f.tell() accurate. + +See: https://github.com/throwaway31265/free-sleep/pull/46 +""" + +import struct + + +def read_raw_record(f): + """Parse one outer {seq, data} CBOR record from file object *f*. + + Returns the raw inner data bytes, or ``None`` for empty placeholder + records (Pod firmware writes ``data=b''`` as sequence-number markers). + + Raises: + EOFError: End of file (no more data to read). + ValueError: Malformed CBOR structure. + """ + b = f.read(1) + if not b: + raise EOFError + if b[0] != 0xa2: + raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0]) + + # "seq" key — text(3) "seq" + if f.read(4) != b'\x63\x73\x65\x71': + raise ValueError('Expected seq key') + + # seq value — uint32 (0x1a) or uint64 (0x1b) + hdr = f.read(1) + if not hdr: + raise EOFError + if hdr[0] == 0x1a: + seq_bytes = f.read(4) + if len(seq_bytes) < 4: + raise EOFError + elif hdr[0] == 0x1b: + seq_bytes = f.read(8) + if len(seq_bytes) < 8: + raise EOFError + else: + raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0]) + + # "data" key — text(4) "data" + if f.read(5) != b'\x64\x64\x61\x74\x61': + raise ValueError('Expected data key') + + # data value — byte string length + bs = f.read(1) + if not bs: + raise EOFError + ai = bs[0] & 0x1f + if ai <= 23: + length = ai + elif ai == 24: + lb = f.read(1) + if not lb: + raise EOFError + length = lb[0] + elif ai == 25: + lb = f.read(2) + if len(lb) < 2: + raise EOFError + length = struct.unpack('>H', lb)[0] + elif ai == 26: + lb = f.read(4) + if len(lb) < 4: + raise EOFError + length = struct.unpack('>I', lb)[0] + else: + raise ValueError('Unsupported length encoding: %d' % ai) + + data = f.read(length) + if len(data) < length: + raise EOFError + if not data: + return None # empty placeholder record, caller should skip + return data diff --git a/modules/piezo-processor/main.py b/modules/piezo-processor/main.py index ca1c4bde..c12b78d1 100644 --- a/modules/piezo-processor/main.py +++ b/modules/piezo-processor/main.py @@ -27,9 +27,10 @@ from collections import deque from typing import Optional -import struct +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import cbor2 +from common.cbor_raw import read_raw_record import numpy as np from scipy.signal import butter, filtfilt, welch import heartpy as hp @@ -174,82 +175,12 @@ def compute_breathing_rate(samples: np.ndarray, fs: float = SAMPLE_RATE) -> Opti log.debug("Breathing rate computation failed: %s", e) return None -# --------------------------------------------------------------------------- -# CBOR record reader -# --------------------------------------------------------------------------- - -def _read_raw_record(f): - """ - Manually parse one outer {seq, data} CBOR record using f.read(). - - The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks, - so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual - record size. Since RAW file records are typically 17-5000 bytes, this causes - nearly every record to be skipped silently. - - This function parses the outer {seq: uint, data: bytes} wrapper byte-by-byte - using f.read(), keeping f.tell() accurate after each record. - - Returns the raw inner data bytes, or None for empty placeholder records - (which the Pod firmware writes as sequence number markers with data=b''). - Raises EOFError at end of file, ValueError on malformed data. - """ - b = f.read(1) - if not b: - raise EOFError - if b[0] != 0xa2: - raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0]) - if f.read(4) != b'\x63\x73\x65\x71': - raise ValueError('Expected seq key') - hdr = f.read(1) - if not hdr: - raise EOFError - if hdr[0] == 0x1a: - seq_bytes = f.read(4) - if len(seq_bytes) < 4: - raise EOFError - elif hdr[0] == 0x1b: - seq_bytes = f.read(8) - if len(seq_bytes) < 8: - raise EOFError - else: - raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0]) - if f.read(5) != b'\x64\x64\x61\x74\x61': - raise ValueError('Expected data key') - bs = f.read(1) - if not bs: - raise EOFError - ai = bs[0] & 0x1f - if ai <= 23: - length = ai - elif ai == 24: - lb = f.read(1) - if not lb: - raise EOFError - length = lb[0] - elif ai == 25: - lb = f.read(2) - if len(lb) < 2: - raise EOFError - length = struct.unpack('>H', lb)[0] - elif ai == 26: - lb = f.read(4) - if len(lb) < 4: - raise EOFError - length = struct.unpack('>I', lb)[0] - else: - raise ValueError('Unsupported length encoding: %d' % ai) - data = f.read(length) - if len(data) < length: - raise EOFError - if not data: - return None # empty placeholder record, caller should skip - return data - # --------------------------------------------------------------------------- # RAW file follower # --------------------------------------------------------------------------- +MAX_CONSECUTIVE_FAILURES = 5 + class RawFileFollower: """ Follows the newest .RAW file in RAW_DATA_DIR, tailing it as new CBOR @@ -261,6 +192,7 @@ def __init__(self, data_dir: Path): self._file = None self._path = None self._last_pos = 0 + self._consecutive_failures = 0 def _find_latest(self) -> Optional[Path]: candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True) @@ -281,22 +213,38 @@ def read_records(self): self._file = open(latest, "rb") self._path = latest self._last_pos = 0 + self._consecutive_failures = 0 try: - data_bytes = _read_raw_record(self._file) + data_bytes = read_raw_record(self._file) if data_bytes is None: self._last_pos = self._file.tell() + self._consecutive_failures = 0 continue # empty placeholder record inner = cbor2.loads(data_bytes) self._last_pos = self._file.tell() + self._consecutive_failures = 0 yield inner except EOFError: # No new data yet — poll time.sleep(0.01) - except Exception as e: - log.warning("Error reading RAW record: %s", e) + except (ValueError, cbor2.CBORDecodeError, OSError) as e: + self._consecutive_failures += 1 + if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + log.warning("Skipping past corrupt data at offset %d after %d failures: %s", + self._last_pos, self._consecutive_failures, e) + self._last_pos += 1 + self._consecutive_failures = 0 + else: + log.debug("Error reading RAW record (attempt %d): %s", + self._consecutive_failures, e) self._file.seek(self._last_pos) - time.sleep(1) + time.sleep(0.1) + + # Clean up file handle on shutdown + if self._file: + self._file.close() + self._file = None # --------------------------------------------------------------------------- # Per-side processor diff --git a/modules/sleep-detector/main.py b/modules/sleep-detector/main.py index 67a49b13..c97f892c 100644 --- a/modules/sleep-detector/main.py +++ b/modules/sleep-detector/main.py @@ -29,9 +29,10 @@ from dataclasses import dataclass, field from typing import Optional -import struct +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import cbor2 +from common.cbor_raw import read_raw_record import numpy as np # --------------------------------------------------------------------------- @@ -257,88 +258,19 @@ def _flush_movement(self, ts: float) -> None: self._movement_buf = [] self._last_movement_write = ts -# --------------------------------------------------------------------------- -# CBOR record reader -# --------------------------------------------------------------------------- - -def _read_raw_record(f): - """ - Manually parse one outer {seq, data} CBOR record using f.read(). - - The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks, - so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual - record size. Since RAW file records are typically 17-5000 bytes, this causes - nearly every record to be skipped silently. - - This function parses the outer {seq: uint, data: bytes} wrapper byte-by-byte - using f.read(), keeping f.tell() accurate after each record. - - Returns the raw inner data bytes, or None for empty placeholder records - (which the Pod firmware writes as sequence number markers with data=b''). - Raises EOFError at end of file, ValueError on malformed data. - """ - b = f.read(1) - if not b: - raise EOFError - if b[0] != 0xa2: - raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0]) - if f.read(4) != b'\x63\x73\x65\x71': - raise ValueError('Expected seq key') - hdr = f.read(1) - if not hdr: - raise EOFError - if hdr[0] == 0x1a: - seq_bytes = f.read(4) - if len(seq_bytes) < 4: - raise EOFError - elif hdr[0] == 0x1b: - seq_bytes = f.read(8) - if len(seq_bytes) < 8: - raise EOFError - else: - raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0]) - if f.read(5) != b'\x64\x64\x61\x74\x61': - raise ValueError('Expected data key') - bs = f.read(1) - if not bs: - raise EOFError - ai = bs[0] & 0x1f - if ai <= 23: - length = ai - elif ai == 24: - lb = f.read(1) - if not lb: - raise EOFError - length = lb[0] - elif ai == 25: - lb = f.read(2) - if len(lb) < 2: - raise EOFError - length = struct.unpack('>H', lb)[0] - elif ai == 26: - lb = f.read(4) - if len(lb) < 4: - raise EOFError - length = struct.unpack('>I', lb)[0] - else: - raise ValueError('Unsupported length encoding: %d' % ai) - data = f.read(length) - if len(data) < length: - raise EOFError - if not data: - return None # empty placeholder record, caller should skip - return data - # --------------------------------------------------------------------------- # RAW file follower (same pattern as piezo-processor) # --------------------------------------------------------------------------- +MAX_CONSECUTIVE_FAILURES = 5 + class RawFileFollower: def __init__(self, data_dir: Path): self.data_dir = data_dir self._file = None self._path = None self._last_pos = 0 + self._consecutive_failures = 0 def _find_latest(self): candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True) @@ -358,21 +290,32 @@ def read_records(self): self._file = open(latest, "rb") self._path = latest self._last_pos = 0 + self._consecutive_failures = 0 try: - data_bytes = _read_raw_record(self._file) + data_bytes = read_raw_record(self._file) if data_bytes is None: self._last_pos = self._file.tell() + self._consecutive_failures = 0 continue # empty placeholder record inner = cbor2.loads(data_bytes) self._last_pos = self._file.tell() + self._consecutive_failures = 0 yield inner except EOFError: time.sleep(0.5) - except Exception as e: - log.warning("Error reading RAW record: %s", e) + except (ValueError, cbor2.CBORDecodeError, OSError) as e: + self._consecutive_failures += 1 + if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + log.warning("Skipping past corrupt data at offset %d after %d failures: %s", + self._last_pos, self._consecutive_failures, e) + self._last_pos += 1 + self._consecutive_failures = 0 + else: + log.debug("Error reading RAW record (attempt %d): %s", + self._consecutive_failures, e) self._file.seek(self._last_pos) - time.sleep(1) + time.sleep(0.1) # Clean up file handle on shutdown if self._file: From 3903ad6978aef947cd72851c3d431af106f7df17 Mon Sep 17 00:00:00 2001 From: Jonathan Ng Date: Sat, 14 Mar 2026 16:49:14 -0700 Subject: [PATCH 3/3] fix(biometrics): address adversarial review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes from dual-agent (Optimizer + Skeptic) adversarial code review: Parser hardening (cbor_raw.py): - Validate data value major type is byte string (mt=2) - Accept all CBOR uint encodings for seq (inline through uint64) - Detect partial key reads as EOFError, not ValueError - Add length sanity cap (1MB) to prevent corrupt-length OOM - Add uint64 (ai=27) support for data byte-string length - Document protocol contract (key order, seq encoding) Error recovery (raw_follower.py — extracted from both modules): - Seek back to _last_pos on EOFError (prevents partial-record loss) - Reset _consecutive_failures on EOFError (prevents false corruption) - Swap except clause order: catch CBORDecodeError before EOFError (CBORDecodeEOF is a subclass of both — wrong branch caused infinite retry on corrupt inner records) - Guard _find_latest against FileNotFoundError race (glob vs stat) Architecture: - Extract RawFileFollower to modules/common/raw_follower.py (DRY) - Add modules/common/__init__.py for explicit package - Configurable poll_interval replaces hardcoded sleep values Pre-existing fixes: - Fix report_health connection leak (try/finally around conn.close) Co-Authored-By: seanpasino --- modules/common/__init__.py | 0 modules/common/cbor_raw.py | 64 +++++++++++++++----- modules/common/raw_follower.py | 100 ++++++++++++++++++++++++++++++++ modules/piezo-processor/main.py | 98 +++++-------------------------- modules/sleep-detector/main.py | 91 +++++------------------------ 5 files changed, 180 insertions(+), 173 deletions(-) create mode 100644 modules/common/__init__.py create mode 100644 modules/common/raw_follower.py diff --git a/modules/common/__init__.py b/modules/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/common/cbor_raw.py b/modules/common/cbor_raw.py index 6fb4a8d8..1cc5d8fc 100644 --- a/modules/common/cbor_raw.py +++ b/modules/common/cbor_raw.py @@ -9,11 +9,19 @@ This module provides a manual parser for the outer {seq, data} CBOR wrapper that reads byte-by-byte using f.read(), keeping f.tell() accurate. +Protocol contract: Pod firmware always emits the outer map with keys in the +order "seq" then "data". The seq value is encoded as a fixed-width uint32 +(0x1a) by the embedded CBOR encoder, regardless of value. This parser +validates that contract and raises ValueError on deviation. + See: https://github.com/throwaway31265/free-sleep/pull/46 """ import struct +# Reject implausibly large data payloads (corrupt length field protection) +_MAX_DATA_LENGTH = 1_000_000 + def read_raw_record(f): """Parse one outer {seq, data} CBOR record from file object *f*. @@ -22,7 +30,7 @@ def read_raw_record(f): records (Pod firmware writes ``data=b''`` as sequence-number markers). Raises: - EOFError: End of file (no more data to read). + EOFError: End of file or incomplete record (not yet fully written). ValueError: Malformed CBOR structure. """ b = f.read(1) @@ -32,32 +40,52 @@ def read_raw_record(f): raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0]) # "seq" key — text(3) "seq" - if f.read(4) != b'\x63\x73\x65\x71': + # Pod firmware always emits keys in order: seq, then data. + seq_key = f.read(4) + if len(seq_key) < 4: + raise EOFError + if seq_key != b'\x63\x73\x65\x71': raise ValueError('Expected seq key') - # seq value — uint32 (0x1a) or uint64 (0x1b) + # seq value — Pod firmware uses fixed-width uint32 (0x1a), but we accept + # all valid CBOR unsigned integer encodings for forward compatibility. hdr = f.read(1) if not hdr: raise EOFError - if hdr[0] == 0x1a: - seq_bytes = f.read(4) - if len(seq_bytes) < 4: + mt = hdr[0] >> 5 + ai = hdr[0] & 0x1f + if mt != 0: + raise ValueError('seq must be unsigned int, got major type %d' % mt) + if ai <= 23: + pass # inline value, no additional bytes + elif ai == 24: + if len(f.read(1)) < 1: + raise EOFError + elif ai == 25: + if len(f.read(2)) < 2: + raise EOFError + elif ai == 26: + if len(f.read(4)) < 4: raise EOFError - elif hdr[0] == 0x1b: - seq_bytes = f.read(8) - if len(seq_bytes) < 8: + elif ai == 27: + if len(f.read(8)) < 8: raise EOFError else: raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0]) # "data" key — text(4) "data" - if f.read(5) != b'\x64\x64\x61\x74\x61': + data_key = f.read(5) + if len(data_key) < 5: + raise EOFError + if data_key != b'\x64\x64\x61\x74\x61': raise ValueError('Expected data key') - # data value — byte string length + # data value — must be a byte string (major type 2) bs = f.read(1) if not bs: raise EOFError + if bs[0] >> 5 != 2: + raise ValueError('data must be a byte string, got major type %d' % (bs[0] >> 5)) ai = bs[0] & 0x1f if ai <= 23: length = ai @@ -76,12 +104,20 @@ def read_raw_record(f): if len(lb) < 4: raise EOFError length = struct.unpack('>I', lb)[0] + elif ai == 27: + lb = f.read(8) + if len(lb) < 8: + raise EOFError + length = struct.unpack('>Q', lb)[0] else: - raise ValueError('Unsupported length encoding: %d' % ai) + raise ValueError('Unsupported data length encoding: %d' % ai) + + if length > _MAX_DATA_LENGTH: + raise ValueError('Implausibly large data field: %d bytes' % length) data = f.read(length) if len(data) < length: raise EOFError - if not data: - return None # empty placeholder record, caller should skip + if not data: # length==0: firmware placeholder record (sequence-number marker) + return None return data diff --git a/modules/common/raw_follower.py b/modules/common/raw_follower.py new file mode 100644 index 00000000..79e0b679 --- /dev/null +++ b/modules/common/raw_follower.py @@ -0,0 +1,100 @@ +""" +Shared RAW file follower for SleepyPod biometrics modules. + +Tails the newest .RAW file in a directory, yielding decoded CBOR records +as they are appended by the hardware daemon. +""" + +import time +import logging +from pathlib import Path +from typing import Optional + +import cbor2 + +from common.cbor_raw import read_raw_record + +log = logging.getLogger(__name__) + +MAX_CONSECUTIVE_FAILURES = 5 + + +def _safe_mtime(p: Path) -> float: + """Return mtime, or 0.0 if the file was deleted between glob and stat.""" + try: + return p.stat().st_mtime + except FileNotFoundError: + return 0.0 + + +class RawFileFollower: + """Follow the newest .RAW file, yielding decoded CBOR records. + + Args: + data_dir: Directory containing .RAW files. + shutdown_event: A threading.Event that signals shutdown. + poll_interval: Seconds to sleep when no new data is available. + """ + + def __init__(self, data_dir: Path, shutdown_event, poll_interval: float = 0.01): + self.data_dir = data_dir + self._shutdown = shutdown_event + self._poll_interval = poll_interval + self._file = None + self._path = None + self._last_pos = 0 + self._consecutive_failures = 0 + + def _find_latest(self) -> Optional[Path]: + candidates = [p for p in self.data_dir.glob("*.RAW") if _safe_mtime(p) > 0] + candidates.sort(key=_safe_mtime, reverse=True) + return candidates[0] if candidates else None + + def read_records(self): + """Yield decoded CBOR records as they arrive, sleeping between poll attempts.""" + while not self._shutdown.is_set(): + latest = self._find_latest() + if latest is None: + time.sleep(1) + continue + + if latest != self._path: + log.info("Switched to RAW file: %s", latest.name) + if self._file: + self._file.close() + self._file = open(latest, "rb") + self._path = latest + self._last_pos = 0 + self._consecutive_failures = 0 + + try: + data_bytes = read_raw_record(self._file) + if data_bytes is None: + self._last_pos = self._file.tell() + self._consecutive_failures = 0 + continue # empty placeholder record + inner = cbor2.loads(data_bytes) + self._last_pos = self._file.tell() + self._consecutive_failures = 0 + yield inner + except (ValueError, cbor2.CBORDecodeError, OSError) as e: + self._consecutive_failures += 1 + if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + log.warning("Skipping past corrupt data at offset %d after %d failures: %s", + self._last_pos, self._consecutive_failures, e) + self._last_pos += 1 + self._consecutive_failures = 0 + else: + log.debug("Error reading RAW record (attempt %d): %s", + self._consecutive_failures, e) + self._file.seek(self._last_pos) + time.sleep(0.1) + except EOFError: + self._file.seek(self._last_pos) + self._consecutive_failures = 0 + time.sleep(self._poll_interval) + + # Clean up file handle on shutdown + if self._file: + self._file.close() + self._file = None diff --git a/modules/piezo-processor/main.py b/modules/piezo-processor/main.py index c12b78d1..8c818a62 100644 --- a/modules/piezo-processor/main.py +++ b/modules/piezo-processor/main.py @@ -30,7 +30,7 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import cbor2 -from common.cbor_raw import read_raw_record +from common.raw_follower import RawFileFollower import numpy as np from scipy.signal import butter, filtfilt, welch import heartpy as hp @@ -101,17 +101,19 @@ def report_health(status: str, message: str) -> None: """Write module health to sleepypod.db system_health table.""" try: conn = sqlite3.connect(str(SLEEPYPOD_DB), timeout=2.0) - with conn: - conn.execute( - """INSERT INTO system_health (component, status, message, last_checked) - VALUES ('piezo-processor', ?, ?, ?) - ON CONFLICT(component) DO UPDATE SET - status=excluded.status, - message=excluded.message, - last_checked=excluded.last_checked""", - (status, message, int(time.time())), - ) - conn.close() + try: + with conn: + conn.execute( + """INSERT INTO system_health (component, status, message, last_checked) + VALUES ('piezo-processor', ?, ?, ?) + ON CONFLICT(component) DO UPDATE SET + status=excluded.status, + message=excluded.message, + last_checked=excluded.last_checked""", + (status, message, int(time.time())), + ) + finally: + conn.close() except Exception as e: log.warning("Could not write health status: %s", e) @@ -175,76 +177,6 @@ def compute_breathing_rate(samples: np.ndarray, fs: float = SAMPLE_RATE) -> Opti log.debug("Breathing rate computation failed: %s", e) return None -# --------------------------------------------------------------------------- -# RAW file follower -# --------------------------------------------------------------------------- - -MAX_CONSECUTIVE_FAILURES = 5 - -class RawFileFollower: - """ - Follows the newest .RAW file in RAW_DATA_DIR, tailing it as new CBOR - records are appended by the hardware daemon. - """ - - def __init__(self, data_dir: Path): - self.data_dir = data_dir - self._file = None - self._path = None - self._last_pos = 0 - self._consecutive_failures = 0 - - def _find_latest(self) -> Optional[Path]: - candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True) - return candidates[0] if candidates else None - - def read_records(self): - """Yield decoded CBOR records as they arrive. Blocks between records.""" - while not _shutdown.is_set(): - latest = self._find_latest() - if latest is None: - time.sleep(1) - continue - - if latest != self._path: - log.info("Switched to RAW file: %s", latest.name) - if self._file: - self._file.close() - self._file = open(latest, "rb") - self._path = latest - self._last_pos = 0 - self._consecutive_failures = 0 - - try: - data_bytes = read_raw_record(self._file) - if data_bytes is None: - self._last_pos = self._file.tell() - self._consecutive_failures = 0 - continue # empty placeholder record - inner = cbor2.loads(data_bytes) - self._last_pos = self._file.tell() - self._consecutive_failures = 0 - yield inner - except EOFError: - # No new data yet — poll - time.sleep(0.01) - except (ValueError, cbor2.CBORDecodeError, OSError) as e: - self._consecutive_failures += 1 - if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: - log.warning("Skipping past corrupt data at offset %d after %d failures: %s", - self._last_pos, self._consecutive_failures, e) - self._last_pos += 1 - self._consecutive_failures = 0 - else: - log.debug("Error reading RAW record (attempt %d): %s", - self._consecutive_failures, e) - self._file.seek(self._last_pos) - time.sleep(0.1) - - # Clean up file handle on shutdown - if self._file: - self._file.close() - self._file = None # --------------------------------------------------------------------------- # Per-side processor @@ -300,7 +232,7 @@ def main() -> None: db_conn = open_biometrics_db() left = SideProcessor("left", db_conn) right = SideProcessor("right", db_conn) - follower = RawFileFollower(RAW_DATA_DIR) + follower = RawFileFollower(RAW_DATA_DIR, _shutdown, poll_interval=0.01) report_health("healthy", "piezo-processor started") diff --git a/modules/sleep-detector/main.py b/modules/sleep-detector/main.py index c97f892c..d560e75f 100644 --- a/modules/sleep-detector/main.py +++ b/modules/sleep-detector/main.py @@ -32,7 +32,7 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import cbor2 -from common.cbor_raw import read_raw_record +from common.raw_follower import RawFileFollower import numpy as np # --------------------------------------------------------------------------- @@ -123,17 +123,19 @@ def write_movement(conn: sqlite3.Connection, side: str, def report_health(status: str, message: str) -> None: try: conn = sqlite3.connect(str(SLEEPYPOD_DB), timeout=2.0) - with conn: - conn.execute( - """INSERT INTO system_health (component, status, message, last_checked) - VALUES ('sleep-detector', ?, ?, ?) - ON CONFLICT(component) DO UPDATE SET - status=excluded.status, - message=excluded.message, - last_checked=excluded.last_checked""", - (status, message, int(time.time())), - ) - conn.close() + try: + with conn: + conn.execute( + """INSERT INTO system_health (component, status, message, last_checked) + VALUES ('sleep-detector', ?, ?, ?) + ON CONFLICT(component) DO UPDATE SET + status=excluded.status, + message=excluded.message, + last_checked=excluded.last_checked""", + (status, message, int(time.time())), + ) + finally: + conn.close() except Exception as e: log.warning("Could not write health status: %s", e) @@ -258,69 +260,6 @@ def _flush_movement(self, ts: float) -> None: self._movement_buf = [] self._last_movement_write = ts -# --------------------------------------------------------------------------- -# RAW file follower (same pattern as piezo-processor) -# --------------------------------------------------------------------------- - -MAX_CONSECUTIVE_FAILURES = 5 - -class RawFileFollower: - def __init__(self, data_dir: Path): - self.data_dir = data_dir - self._file = None - self._path = None - self._last_pos = 0 - self._consecutive_failures = 0 - - def _find_latest(self): - candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True) - return candidates[0] if candidates else None - - def read_records(self): - while not _shutdown.is_set(): - latest = self._find_latest() - if latest is None: - time.sleep(1) - continue - - if latest != self._path: - log.info("Switched to RAW file: %s", latest.name) - if self._file: - self._file.close() - self._file = open(latest, "rb") - self._path = latest - self._last_pos = 0 - self._consecutive_failures = 0 - - try: - data_bytes = read_raw_record(self._file) - if data_bytes is None: - self._last_pos = self._file.tell() - self._consecutive_failures = 0 - continue # empty placeholder record - inner = cbor2.loads(data_bytes) - self._last_pos = self._file.tell() - self._consecutive_failures = 0 - yield inner - except EOFError: - time.sleep(0.5) - except (ValueError, cbor2.CBORDecodeError, OSError) as e: - self._consecutive_failures += 1 - if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES: - log.warning("Skipping past corrupt data at offset %d after %d failures: %s", - self._last_pos, self._consecutive_failures, e) - self._last_pos += 1 - self._consecutive_failures = 0 - else: - log.debug("Error reading RAW record (attempt %d): %s", - self._consecutive_failures, e) - self._file.seek(self._last_pos) - time.sleep(0.1) - - # Clean up file handle on shutdown - if self._file: - self._file.close() - self._file = None # --------------------------------------------------------------------------- # Main loop @@ -336,7 +275,7 @@ def main() -> None: db_conn = open_biometrics_db() left = SessionTracker(side="left", db=db_conn) right = SessionTracker(side="right", db=db_conn) - follower = RawFileFollower(RAW_DATA_DIR) + follower = RawFileFollower(RAW_DATA_DIR, _shutdown, poll_interval=0.5) report_health("healthy", "sleep-detector started")