diff --git a/README.md b/README.md index 10860dd0f..de0546e09 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,20 @@ # π RuView

- + RuView - WiFi DensePose

+> **Alpha Software** — This project is under active development. APIs, firmware behavior, and documentation may change. Known limitations: +> - Multi-node person counting may show identical output regardless of the number of people (#249) +> - Training pipeline on MM-Fi dataset may plateau at low PCK (#318) — hyperparameter tuning in progress +> - No pre-trained model weights are provided; training from scratch is required +> - ESP32-C3 and original ESP32 are not supported (single-core, insufficient for CSI DSP) +> - Single ESP32 deployments have limited spatial resolution +> +> Contributions and bug reports welcome at [Issues](https://github.com/ruvnet/RuView/issues). + ## **See through walls with WiFi + Ai** ## **Perceive the world through signals.** No cameras. No wearables. No Internet. Just physics. @@ -14,7 +23,7 @@ Instead of relying on cameras or cloud models, it observes whatever signals exist in a space such as WiFi, radio waves across the spectrum, motion patterns, vibration, sound, or other sensory inputs and builds an understanding of what is happening locally. -Built on top of [RuVector](https://github.com/ruvnet/ruvector/), the project became widely known for its implementation of WiFi DensePose — a sensing technique first explored in academic research such as Carnegie Mellon University's *DensePose From WiFi* work. That research demonstrated that WiFi signals can be used to reconstruct human pose. +Built on top of [RuVector](https://github.com/ruvnet/ruvector/) Self Learning Vector Memory system and [Cognitum.One](https://Cognitum.One) , the project became widely known for its implementation of WiFi DensePose — a sensing technique first explored in academic research such as Carnegie Mellon University's *DensePose From WiFi* work. That research demonstrated that WiFi signals can be used to reconstruct human pose. RuView extends that concept into a practical edge system. By analyzing Channel State Information (CSI) disturbances caused by human movement, RuView reconstructs body position, breathing rate, heart rate, and presence in real time using physics-based signal processing and machine learning. diff --git a/docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md b/docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md new file mode 100644 index 000000000..a01a5f817 --- /dev/null +++ b/docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md @@ -0,0 +1,151 @@ +# ADR-067: RuVector v2.0.4 to v2.0.5 Upgrade + New Crate Adoption + +**Status:** Proposed +**Date:** 2026-03-23 +**Deciders:** @ruvnet +**Related:** ADR-016 (RuVector training pipeline integration), ADR-017 (RuVector signal + MAT integration), ADR-029 (RuvSense multistatic sensing) + +## Context + +RuView currently pins all five core RuVector crates at **v2.0.4** (from crates.io) plus a vendored `ruvector-crv` v0.1.1 and optional `ruvector-gnn` v2.0.5. The upstream RuVector workspace has moved to **v2.0.5** with meaningful improvements to the crates we depend on, and has introduced new crates that could benefit RuView's detection pipeline. + +### Current Integration Map + +| RuView Module | RuVector Crate | Current Version | Purpose | +|---------------|----------------|-----------------|---------| +| `signal/subcarrier.rs` | ruvector-mincut | 2.0.4 | Graph min-cut subcarrier partitioning | +| `signal/spectrogram.rs` | ruvector-attn-mincut | 2.0.4 | Attention-gated spectrogram denoising | +| `signal/bvp.rs` | ruvector-attention | 2.0.4 | Attention-weighted BVP aggregation | +| `signal/fresnel.rs` | ruvector-solver | 2.0.4 | Fresnel geometry estimation | +| `mat/triangulation.rs` | ruvector-solver | 2.0.4 | TDoA survivor localization | +| `mat/breathing.rs` | ruvector-temporal-tensor | 2.0.4 | Tiered compressed breathing buffer | +| `mat/heartbeat.rs` | ruvector-temporal-tensor | 2.0.4 | Tiered compressed heartbeat spectrogram | +| `viewpoint/*` (4 files) | ruvector-attention | 2.0.4 | Cross-viewpoint fusion with geometric bias | +| `crv/` (optional) | ruvector-crv | 0.1.1 (vendored) | CRV protocol integration | +| `crv/` (optional) | ruvector-gnn | 2.0.5 | GNN graph topology | + +### What Changed Upstream (v2.0.4 → v2.0.5 → HEAD) + +**ruvector-mincut:** +- Flat capacity matrix + allocation reuse — **10-30% faster** for all min-cut operations +- Tier 2-3 Dynamic MinCut (ADR-124): Gomory-Hu tree construction for fast global min-cut, incremental edge insert/delete without full recomputation +- Source-anchored canonical min-cut with SHA-256 witness hashing +- Fixed: unsafe indexing removed, WASM Node.js panic from `std::time` + +**ruvector-attention / ruvector-attn-mincut:** +- Migrated to workspace versioning (no API changes) +- Documentation improvements + +**ruvector-temporal-tensor:** +- Formatting fixes only (no API changes) + +**ruvector-gnn:** +- Panic replaced with `Result` in `MultiHeadAttention` and `RuvectorLayer` constructors (breaking improvement — safer) +- Bumped to v2.0.5 + +**sona (new — Self-Optimizing Neural Architecture):** +- v0.1.6 → v0.1.8: state persistence (`loadState`/`saveState`), trajectory counter fix +- Micro-LoRA and Base-LoRA for instant and background learning +- EWC++ (Elastic Weight Consolidation) to prevent catastrophic forgetting +- ReasoningBank pattern extraction and similarity search +- WASM support for edge devices + +**ruvector-coherence (new):** +- Spectral coherence scoring for graph index health +- Fiedler eigenvalue estimation, effective resistance sampling +- HNSW health monitoring with alerts +- Batch evaluation of attention mechanism quality + +**ruvector-core (new):** +- ONNX embedding support for real semantic embeddings +- HNSW index with SIMD-accelerated distance metrics +- Quantization (4-32x memory reduction) +- Arena allocator for cache-optimized operations + +## Decision + +### Phase 1: Version Bump (Low Risk) + +Bump the 5 core crates from v2.0.4 to v2.0.5 in the workspace `Cargo.toml`: + +```toml +ruvector-mincut = "2.0.5" # was 2.0.4 — 10-30% faster, safer +ruvector-attn-mincut = "2.0.5" # was 2.0.4 — workspace versioning +ruvector-temporal-tensor = "2.0.5" # was 2.0.4 — fmt only +ruvector-solver = "2.0.5" # was 2.0.4 — workspace versioning +ruvector-attention = "2.0.5" # was 2.0.4 — workspace versioning +``` + +**Expected impact:** The mincut performance improvement directly benefits `signal/subcarrier.rs` which runs subcarrier graph partitioning every tick. 10-30% faster partitioning reduces per-frame CPU cost. + +### Phase 2: Add ruvector-coherence (Medium Value) + +Add `ruvector-coherence` with `spectral` feature to `wifi-densepose-ruvector`: + +**Use case:** Replace or augment the custom phase coherence logic in `viewpoint/coherence.rs` with spectral graph coherence scoring. The current implementation uses phasor magnitude for phase coherence — spectral Fiedler estimation would provide a more robust measure of multi-node CSI consistency, especially for detecting when a node's signal quality degrades. + +**Integration point:** `viewpoint/coherence.rs` — add `SpectralCoherenceScore` as a secondary coherence metric alongside existing phase phasor coherence. Use spectral gap estimation to detect structural changes in the multi-node CSI graph (e.g., a node dropping out or a new reflector appearing). + +### Phase 3: Add SONA for Adaptive Learning (High Value) + +Replace the logistic regression adaptive classifier in the sensing server with a SONA-backed learning engine: + +**Current state:** The sensing server's adaptive training (`POST /api/v1/adaptive/train`) uses a hand-rolled logistic regression on 15 CSI features. It requires explicit labeled recordings and provides no cross-session persistence. + +**Proposed improvement:** Use `sona::SonaEngine` to: +1. **Learn from implicit feedback** — trajectory tracking on person-count decisions (was the count stable? did the user correct it?) +2. **Persist across sessions** — `saveState()`/`loadState()` replaces the current `adaptive_model.json` +3. **Pattern matching** — `find_patterns()` enables "this CSI signature looks like room X where we learned Y" +4. **Prevent forgetting** — EWC++ ensures learning in a new room doesn't overwrite patterns from previous rooms + +**Integration point:** New `adaptive_sona.rs` module in `wifi-densepose-sensing-server`, behind a `sona` feature flag. The existing logistic regression remains the default. + +### Phase 4: Evaluate ruvector-core for CSI Embeddings (Exploratory) + +**Current state:** The person detection pipeline uses hand-crafted features (variance, change_points, motion_band_power, spectral_power) with fixed normalization ranges. + +**Potential:** Use `ruvector-core`'s ONNX embedding support to generate learned CSI embeddings that capture room geometry, person count, and activity patterns in a single vector. This would enable: +- Similarity search: "is this CSI frame similar to known 2-person patterns?" +- Transfer learning: embeddings learned in one room partially transfer to similar rooms +- Quantized storage: 4-32x memory reduction for pattern databases + +**Status:** Exploratory — requires training data collection and embedding model design. Not a near-term target. + +## Consequences + +### Positive +- **Phase 1:** Free 10-30% performance gain in subcarrier partitioning. Security fixes (unsafe indexing, WASM panic). Zero API changes required. +- **Phase 2:** More robust multi-node coherence detection. Helps with the "flickering persons" issue (#292) by providing a second opinion on signal quality. +- **Phase 3:** Fundamentally improves the adaptive learning pipeline. Users no longer need to manually record labeled data — the system learns from ongoing use. +- **Phase 4:** Path toward real ML-based detection instead of heuristic thresholds. + +### Negative +- **Phase 1:** Minimal risk — semver minor bump, no API breaks. +- **Phase 2:** Adds a dependency. Spectral computation has O(n) cost per tick for Fiedler estimation (n = number of subcarriers, typically 56-128). Acceptable. +- **Phase 3:** SONA adds ~200KB to the binary. The learning loop needs careful tuning to avoid adapting to noise. +- **Phase 4:** Requires significant research and training data. Not guaranteed to outperform tuned heuristics for WiFi CSI. + +### Risks +- `ruvector-gnn` v2.0.5 changed constructors from panic to `Result` — any existing `crv` feature users need to handle the `Result`. Our vendored `ruvector-crv` may need updates. +- SONA's WASM support is experimental — keep it behind a feature flag until validated. + +## Implementation Plan + +| Phase | Scope | Effort | Priority | +|-------|-------|--------|----------| +| 1 | Bump 5 crates to v2.0.5 | 1 hour | High — free perf + security | +| 2 | Add ruvector-coherence | 1 day | Medium — improves multi-node stability | +| 3 | SONA adaptive learning | 3 days | Medium — replaces manual training workflow | +| 4 | CSI embeddings via ruvector-core | 1-2 weeks | Low — exploratory research | + +## Vendor Submodule + +The `vendor/ruvector` git submodule has been updated from commit `f8f2c60` (v2.0.4 era) to `51a3557` (latest `origin/main`). This provides local reference for the full upstream source when developing Phases 2-4. + +## References + +- Upstream repo: https://github.com/ruvnet/ruvector +- ADR-124 (Dynamic MinCut): `vendor/ruvector/docs/adr/ADR-124*.md` +- SONA docs: `vendor/ruvector/crates/sona/src/lib.rs` +- ruvector-coherence spectral: `vendor/ruvector/crates/ruvector-coherence/src/spectral.rs` +- ruvector-core embeddings: `vendor/ruvector/crates/ruvector-core/src/embeddings.rs` diff --git a/docs/adr/ADR-068-per-node-state-pipeline.md b/docs/adr/ADR-068-per-node-state-pipeline.md new file mode 100644 index 000000000..4438b714a --- /dev/null +++ b/docs/adr/ADR-068-per-node-state-pipeline.md @@ -0,0 +1,182 @@ +# ADR-068: Per-Node State Pipeline for Multi-Node Sensing + +| Field | Value | +|------------|-------------------------------------| +| Status | Accepted | +| Date | 2026-03-27 | +| Authors | rUv, claude-flow | +| Drivers | #249, #237, #276, #282 | +| Supersedes | — | + +## Context + +The sensing server (`wifi-densepose-sensing-server`) was originally designed for +single-node operation. When multiple ESP32 nodes send CSI frames simultaneously, +all data is mixed into a single shared pipeline: + +- **One** `frame_history` VecDeque for all nodes +- **One** `smoothed_person_score` / `smoothed_motion` / vital sign buffers +- **One** baseline and debounce state + +This means the classification, person count, and vital signs reported to the UI +are an uncontrolled aggregate of all nodes' data. The result: the detection +window shows identical output regardless of how many nodes are deployed, where +people stand, or how many people are in the room (#249 — 24 comments, the most +reported issue). + +### Root Cause Verified + +Investigation of `AppStateInner` (main.rs lines 279-367) confirmed: + +| Shared field | Impact | +|---------------------------|--------------------------------------------| +| `frame_history` | Temporal analysis mixes all nodes' CSI data | +| `smoothed_person_score` | Person count aggregates all nodes | +| `smoothed_motion` | Motion classification undifferentiated | +| `smoothed_hr` / `br` | Vital signs are global, not per-node | +| `baseline_motion` | Adaptive baseline learned from mixed data | +| `debounce_counter` | All nodes share debounce state | + +## Decision + +Introduce **per-node state tracking** via a `HashMap` in +`AppStateInner`. Each ESP32 node (identified by its `node_id` byte) gets an +independent sensing pipeline with its own temporal history, smoothing buffers, +baseline, and classification state. + +### Architecture + +``` + ┌─────────────────────────────────────────┐ + UDP frames │ AppStateInner │ + ───────────► │ │ + node_id=1 ──► │ node_states: HashMap │ + node_id=2 ──► │ ├── 1: NodeState { frame_history, │ + node_id=3 ──► │ │ smoothed_motion, vitals, ... }│ + │ ├── 2: NodeState { ... } │ + │ └── 3: NodeState { ... } │ + │ │ + │ ┌── Per-Node Pipeline ──┐ │ + │ │ extract_features() │ │ + │ │ smooth_and_classify() │ │ + │ │ smooth_vitals() │ │ + │ │ score_to_person_count()│ │ + │ └────────────────────────┘ │ + │ │ + │ ┌── Multi-Node Fusion ──┐ │ + │ │ Aggregate person count │ │ + │ │ Per-node classification│ │ + │ │ All-nodes WebSocket msg│ │ + │ └────────────────────────┘ │ + │ │ + │ ──► WebSocket broadcast (sensing_update) │ + └─────────────────────────────────────────┘ +``` + +### NodeState Struct + +```rust +struct NodeState { + frame_history: VecDeque>, + smoothed_person_score: f64, + prev_person_count: usize, + smoothed_motion: f64, + current_motion_level: String, + debounce_counter: u32, + debounce_candidate: String, + baseline_motion: f64, + baseline_frames: u64, + smoothed_hr: f64, + smoothed_br: f64, + smoothed_hr_conf: f64, + smoothed_br_conf: f64, + hr_buffer: VecDeque, + br_buffer: VecDeque, + rssi_history: VecDeque, + vital_detector: VitalSignDetector, + latest_vitals: VitalSigns, + last_frame_time: Option, + edge_vitals: Option, +} +``` + +### Multi-Node Aggregation + +- **Person count**: Sum of per-node `prev_person_count` for active nodes + (seen within last 10 seconds). +- **Classification**: Per-node classification included in `SensingUpdate.nodes`. +- **Vital signs**: Per-node vital signs; UI can render per-node or aggregate. +- **Signal field**: Generated from the most-recently-updated node's features. +- **Stale nodes**: Nodes with no frame for >10 seconds are excluded from + aggregation and marked offline (consistent with PR #300). + +### Backward Compatibility + +- The simulated data path (`simulated_data_task`) continues using global state. +- Single-node deployments behave identically (HashMap has one entry). +- The WebSocket message format (`sensing_update`) remains the same but the + `nodes` array now contains all active nodes, and `estimated_persons` reflects + the cross-node aggregate. +- The edge vitals path (#323 fix) also uses per-node state. + +## Scaling Characteristics + +| Nodes | Per-Node Memory | Total Overhead | Notes | +|-------|----------------|----------------|-------| +| 1 | ~50 KB | ~50 KB | Identical to current | +| 3 | ~50 KB | ~150 KB | Typical home setup | +| 10 | ~50 KB | ~500 KB | Small office | +| 50 | ~50 KB | ~2.5 MB | Building floor | +| 100 | ~50 KB | ~5 MB | Large deployment | +| 256 | ~50 KB | ~12.8 MB | Max (u8 node_id) | + +Memory is dominated by `frame_history` (100 frames x ~500 bytes each = ~50 KB +per node). This scales linearly and fits comfortably in server memory even at +256 nodes. + +## QEMU Validation + +The existing QEMU swarm infrastructure (ADR-062, `scripts/qemu_swarm.py`) +supports multi-node simulation with configurable topologies: + +- `star`: Central coordinator + sensor nodes +- `mesh`: Fully connected peer network +- `line`: Sequential chain +- `ring`: Circular topology + +Each QEMU instance runs with a unique `node_id` via NVS provisioning. The +swarm health validator (`scripts/swarm_health.py`) checks per-node UART output. + +Validation plan: +1. QEMU swarm with 3-5 nodes in mesh topology +2. Verify server produces distinct per-node classifications +3. Verify aggregate person count reflects multi-node contributions +4. Verify stale-node eviction after timeout + +## Consequences + +### Positive +- Each node's CSI data is processed independently — no cross-contamination +- Person count scales with the number of deployed nodes +- Vital signs are per-node, enabling room-level health monitoring +- Foundation for spatial localization (per-node positions + triangulation) +- Scales to 256 nodes with <13 MB memory overhead + +### Negative +- Slightly more memory per node (~50 KB each) +- `smooth_and_classify_node` function duplicates some logic from global version +- Per-node `VitalSignDetector` instances add CPU cost proportional to node count + +### Risks +- Node ID collisions (mitigated by NVS persistence since v0.5.0) +- HashMap growth without cleanup (mitigated by stale-node eviction) + +## References + +- Issue #249: Detection window same regardless (24 comments) +- Issue #237: Same display for 0/1/2 people (12 comments) +- Issue #276: Only one can be detected (8 comments) +- Issue #282: Detection fail (5 comments) +- PR #295: Hysteresis smoothing (partial mitigation) +- PR #300: ESP32 offline detection after 5s +- ADR-062: QEMU Swarm Configurator diff --git a/pyproject.toml b/pyproject.toml index bb44b2a67..aa03506b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -185,7 +185,7 @@ package-dir = {"" = "."} [tool.setuptools.packages.find] where = ["."] -include = ["src*"] +include = ["wifi_densepose*", "src*"] exclude = ["tests*", "docs*", "scripts*"] [tool.setuptools.package-data] diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml index ee3ce0bef..a76e6f1c1 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml @@ -43,5 +43,8 @@ clap = { workspace = true } # Multi-BSSID WiFi scanning pipeline (ADR-022 Phase 3) wifi-densepose-wifiscan = { version = "0.3.0", path = "../wifi-densepose-wifiscan" } +# Signal processing with RuvSense pose tracker (accuracy sprint) +wifi-densepose-signal = { version = "0.3.0", path = "../wifi-densepose-signal" } + [dev-dependencies] tempfile = "3.10" diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs index 80d2364d5..b89cb58cf 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs @@ -10,6 +10,10 @@ //! //! The trained model is serialised as JSON and hot-loaded at runtime so that //! the classification thresholds adapt to the specific room and ESP32 placement. +//! +//! Classes are discovered dynamically from training data filenames instead of +//! being hardcoded, so new activity classes can be added just by recording data +//! with the appropriate filename convention. use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -20,9 +24,8 @@ use std::path::{Path, PathBuf}; /// Extended feature vector: 7 server features + 8 subcarrier-derived features = 15. const N_FEATURES: usize = 15; -/// Activity classes we recognise. -pub const CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"]; -const N_CLASSES: usize = 4; +/// Default class names for backward compatibility with old saved models. +const DEFAULT_CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"]; /// Extract extended feature vector from a JSONL frame (features + raw amplitudes). pub fn features_from_frame(frame: &serde_json::Value) -> [f64; N_FEATURES] { @@ -124,8 +127,9 @@ pub struct ClassStats { pub struct AdaptiveModel { /// Per-class feature statistics (centroid + spread). pub class_stats: Vec, - /// Logistic regression weights: [N_CLASSES x (N_FEATURES + 1)] (last = bias). - pub weights: Vec<[f64; N_FEATURES + 1]>, + /// Logistic regression weights: [n_classes x (N_FEATURES + 1)] (last = bias). + /// Dynamic: the outer Vec length equals the number of discovered classes. + pub weights: Vec>, /// Global feature normalisation: mean and stddev across all training data. pub global_mean: [f64; N_FEATURES], pub global_std: [f64; N_FEATURES], @@ -133,27 +137,38 @@ pub struct AdaptiveModel { pub trained_frames: usize, pub training_accuracy: f64, pub version: u32, + /// Dynamically discovered class names (in index order). + #[serde(default = "default_class_names")] + pub class_names: Vec, +} + +/// Backward-compatible fallback for models saved without class_names. +fn default_class_names() -> Vec { + DEFAULT_CLASSES.iter().map(|s| s.to_string()).collect() } impl Default for AdaptiveModel { fn default() -> Self { + let n_classes = DEFAULT_CLASSES.len(); Self { class_stats: Vec::new(), - weights: vec![[0.0; N_FEATURES + 1]; N_CLASSES], + weights: vec![vec![0.0; N_FEATURES + 1]; n_classes], global_mean: [0.0; N_FEATURES], global_std: [1.0; N_FEATURES], trained_frames: 0, training_accuracy: 0.0, version: 1, + class_names: default_class_names(), } } } impl AdaptiveModel { /// Classify a raw feature vector. Returns (class_label, confidence). - pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (&'static str, f64) { - if self.weights.is_empty() || self.class_stats.is_empty() { - return ("present_still", 0.5); + pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (String, f64) { + let n_classes = self.weights.len(); + if n_classes == 0 || self.class_stats.is_empty() { + return ("present_still".to_string(), 0.5); } // Normalise features. @@ -163,8 +178,8 @@ impl AdaptiveModel { } // Compute logits: w·x + b for each class. - let mut logits = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES.min(self.weights.len()) { + let mut logits: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { let w = &self.weights[c]; let mut z = w[N_FEATURES]; // bias for i in 0..N_FEATURES { @@ -176,8 +191,8 @@ impl AdaptiveModel { // Softmax. let max_logit = logits.iter().cloned().fold(f64::NEG_INFINITY, f64::max); let exp_sum: f64 = logits.iter().map(|z| (z - max_logit).exp()).sum(); - let mut probs = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES { + let mut probs: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { probs[c] = ((logits[c] - max_logit).exp()) / exp_sum; } @@ -185,7 +200,11 @@ impl AdaptiveModel { let (best_c, best_p) = probs.iter().enumerate() .max_by(|a, b| a.1.partial_cmp(b.1).unwrap()) .unwrap(); - let label = if best_c < CLASSES.len() { CLASSES[best_c] } else { "present_still" }; + let label = if best_c < self.class_names.len() { + self.class_names[best_c].clone() + } else { + "present_still".to_string() + }; (label, *best_p) } @@ -228,48 +247,88 @@ fn load_recording(path: &Path, class_idx: usize) -> Vec { }).collect() } -/// Map a recording filename to a class index. -fn classify_recording_name(name: &str) -> Option { +/// Map a recording filename to a class name (String). +/// Returns the discovered class name for the file, or None if it cannot be determined. +fn classify_recording_name(name: &str) -> Option { let lower = name.to_lowercase(); - if lower.contains("empty") || lower.contains("absent") { Some(0) } - else if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { Some(1) } - else if lower.contains("walking") || lower.contains("moving") { Some(2) } - else if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { Some(3) } - else { None } + // Strip "train_" prefix and ".jsonl" suffix, then extract the class label. + // Convention: train__.jsonl + // The class is the first segment after "train_" that matches a known pattern, + // or the entire middle portion if no pattern matches. + + // Check common patterns first for backward compat + if lower.contains("empty") || lower.contains("absent") { return Some("absent".into()); } + if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { return Some("present_still".into()); } + if lower.contains("walking") || lower.contains("moving") { return Some("present_moving".into()); } + if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { return Some("active".into()); } + + // Fallback: extract class from filename structure train__*.jsonl + let stem = lower.trim_start_matches("train_").trim_end_matches(".jsonl"); + let class_name = stem.split('_').next().unwrap_or(stem); + if !class_name.is_empty() { + Some(class_name.to_string()) + } else { + None + } } /// Train a model from labeled JSONL recordings in a directory. /// -/// Recordings are matched to classes by filename pattern: -/// - `*empty*` / `*absent*` → absent (0) -/// - `*still*` / `*sitting*` → present_still (1) -/// - `*walking*` / `*moving*` → present_moving (2) -/// - `*active*` / `*exercise*`→ active (3) +/// Recordings are matched to classes by filename pattern. Classes are discovered +/// dynamically from the training data filenames: +/// - `*empty*` / `*absent*` → absent +/// - `*still*` / `*sitting*` → present_still +/// - `*walking*` / `*moving*` → present_moving +/// - `*active*` / `*exercise*`→ active +/// - Any other `train__*.jsonl` → pub fn train_from_recordings(recordings_dir: &Path) -> Result { - // Scan for train_* files. - let mut samples: Vec = Vec::new(); - let entries = std::fs::read_dir(recordings_dir) - .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?; - - for entry in entries.flatten() { + // First pass: scan filenames to discover all unique class names. + let entries: Vec<_> = std::fs::read_dir(recordings_dir) + .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))? + .flatten() + .collect(); + + let mut class_map: HashMap = HashMap::new(); + let mut class_names: Vec = Vec::new(); + + // Collect (entry, class_name) pairs for files that match. + let mut file_classes: Vec<(PathBuf, String, String)> = Vec::new(); // (path, fname, class_name) + for entry in &entries { let fname = entry.file_name().to_string_lossy().to_string(); if !fname.starts_with("train_") || !fname.ends_with(".jsonl") { continue; } - if let Some(class_idx) = classify_recording_name(&fname) { - let loaded = load_recording(&entry.path(), class_idx); - eprintln!(" Loaded {}: {} frames → class '{}'", - fname, loaded.len(), CLASSES[class_idx]); - samples.extend(loaded); + if let Some(class_name) = classify_recording_name(&fname) { + if !class_map.contains_key(&class_name) { + let idx = class_names.len(); + class_map.insert(class_name.clone(), idx); + class_names.push(class_name.clone()); + } + file_classes.push((entry.path(), fname, class_name)); } } + let n_classes = class_names.len(); + if n_classes == 0 { + return Err("No training samples found. Record data with train_* prefix.".into()); + } + + // Second pass: load recordings with the discovered class indices. + let mut samples: Vec = Vec::new(); + for (path, fname, class_name) in &file_classes { + let class_idx = class_map[class_name]; + let loaded = load_recording(path, class_idx); + eprintln!(" Loaded {}: {} frames → class '{}'", + fname, loaded.len(), class_name); + samples.extend(loaded); + } + if samples.is_empty() { return Err("No training samples found. Record data with train_* prefix.".into()); } let n = samples.len(); - eprintln!("Total training samples: {n}"); + eprintln!("Total training samples: {n} across {n_classes} classes: {:?}", class_names); // ── Compute global normalisation stats ── let mut global_mean = [0.0f64; N_FEATURES]; @@ -289,9 +348,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes]; let lr = 0.1; let epochs = 200; let batch_size = 32; @@ -348,19 +407,19 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes]; for (x, target) in batch { // Forward: softmax. - let mut logits = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES { + let mut logits: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; // bias for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -368,8 +427,8 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes]; + for c in 0..n_classes { probs[c] = ((logits[c] - max_l).exp()) / exp_sum; } @@ -377,7 +436,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -422,12 +481,12 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -438,9 +497,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result FieldModelConfig { + FieldModelConfig { + n_links: 1, + ..FieldModelConfig::default() + } +} + +/// Estimate occupancy using the FieldModel when calibrated, falling back +/// to the score-based heuristic otherwise. +/// +/// Prefers `estimate_occupancy()` (eigenvalue-based) when the model is +/// calibrated and enough frames are available. Falls back to perturbation +/// energy thresholds, then to the score heuristic. +pub fn occupancy_or_fallback( + field: &FieldModel, + frame_history: &VecDeque>, + smoothed_score: f64, + prev_count: usize, +) -> usize { + match field.status() { + CalibrationStatus::Fresh | CalibrationStatus::Stale => { + let frames: Vec> = frame_history + .iter() + .rev() + .take(OCCUPANCY_WINDOW) + .cloned() + .collect(); + + if frames.is_empty() { + return score_to_person_count(smoothed_score, prev_count); + } + + // Try eigenvalue-based occupancy first (best accuracy). + match field.estimate_occupancy(&frames) { + Ok(count) => return count, + Err(_) => {} // fall through to perturbation energy + } + + // Fallback: perturbation energy thresholds. + // FieldModel expects [n_links][n_subcarriers] — we use n_links=1. + let observation = vec![frames[0].clone()]; + match field.extract_perturbation(&observation) { + Ok(perturbation) => { + if perturbation.total_energy > ENERGY_THRESH_3 { + 3 + } else if perturbation.total_energy > ENERGY_THRESH_2 { + 2 + } else if perturbation.total_energy > 1.0 { + 1 + } else { + 0 + } + } + Err(_) => score_to_person_count(smoothed_score, prev_count), + } + } + _ => score_to_person_count(smoothed_score, prev_count), + } +} + +/// Feed the latest frame to the FieldModel during calibration collection. +/// +/// Only acts when the model status is `Collecting`. Wraps the latest frame +/// as a single-link observation (n_links=1) and feeds it. +pub fn maybe_feed_calibration(field: &mut FieldModel, frame_history: &VecDeque>) { + if field.status() != CalibrationStatus::Collecting { + return; + } + if let Some(latest) = frame_history.back() { + // Single-link observation: [1][n_subcarriers] + let observations = vec![latest.clone()]; + if let Err(e) = field.feed_calibration(&observations) { + tracing::debug!("FieldModel calibration feed: {e}"); + } + } +} + +/// Parse node positions from a semicolon-delimited string. +/// +/// Format: `"x,y,z;x,y,z;..."` where each coordinate is an `f32`. +/// Malformed entries are skipped with a warning log. +pub fn parse_node_positions(input: &str) -> Vec<[f32; 3]> { + if input.is_empty() { + return Vec::new(); + } + input + .split(';') + .enumerate() + .filter_map(|(idx, triplet)| { + let parts: Vec<&str> = triplet.split(',').collect(); + if parts.len() != 3 { + tracing::warn!("Skipping malformed node position entry {idx}: '{triplet}' (expected x,y,z)"); + return None; + } + match (parts[0].parse::(), parts[1].parse::(), parts[2].parse::()) { + (Ok(x), Ok(y), Ok(z)) => Some([x, y, z]), + _ => { + tracing::warn!("Skipping unparseable node position entry {idx}: '{triplet}'"); + None + } + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_node_positions() { + let positions = parse_node_positions("0,0,1.5;3,0,1.5;1.5,3,1.5"); + assert_eq!(positions.len(), 3); + assert_eq!(positions[0], [0.0, 0.0, 1.5]); + assert_eq!(positions[1], [3.0, 0.0, 1.5]); + assert_eq!(positions[2], [1.5, 3.0, 1.5]); + } + + #[test] + fn test_parse_node_positions_empty() { + let positions = parse_node_positions(""); + assert!(positions.is_empty()); + } + + #[test] + fn test_parse_node_positions_invalid() { + let positions = parse_node_positions("abc;1,2,3"); + assert_eq!(positions.len(), 1); + assert_eq!(positions[0], [1.0, 2.0, 3.0]); + } + + #[test] + fn test_parse_node_positions_partial_triplet() { + let positions = parse_node_positions("1,2;3,4,5"); + assert_eq!(positions.len(), 1); + assert_eq!(positions[0], [3.0, 4.0, 5.0]); + } +} diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs index 7497c95a0..fd4f6c2b8 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs @@ -9,14 +9,17 @@ //! Replaces both ws_server.py and the Python HTTP server. mod adaptive_classifier; +mod field_bridge; +mod multistatic_bridge; mod rvf_container; mod rvf_pipeline; +mod tracker_bridge; mod vital_signs; // Training pipeline modules (exposed via lib.rs) use wifi_densepose_sensing_server::{graph_transformer, trainer, dataset, embedding}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; @@ -52,6 +55,11 @@ use wifi_densepose_wifiscan::{ }; use wifi_densepose_wifiscan::parse_netsh_output as parse_netsh_bssid_output; +// Accuracy sprint: Kalman tracker, multistatic fusion, field model +use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker; +use wifi_densepose_signal::ruvsense::multistatic::{MultistaticFuser, MultistaticConfig}; +use wifi_densepose_signal::ruvsense::field_model::{FieldModel, CalibrationStatus}; + // ── CLI ────────────────────────────────────────────────────────────────────── #[derive(Parser, Debug)] @@ -144,6 +152,14 @@ struct Args { /// Build fingerprint index from embeddings (env|activity|temporal|person) #[arg(long, value_name = "TYPE")] build_index: Option, + + /// Node positions for multistatic fusion (format: "x,y,z;x,y,z;...") + #[arg(long, env = "SENSING_NODE_POSITIONS")] + node_positions: Option, + + /// Start field model calibration on boot (empty room required) + #[arg(long)] + calibrate: bool, } // ── Data types ─────────────────────────────────────────────────────────────── @@ -212,6 +228,9 @@ struct SensingUpdate { /// Estimated person count from CSI feature heuristics (1-3 for single ESP32). #[serde(skip_serializing_if = "Option::is_none")] estimated_persons: Option, + /// Per-node feature breakdown for multi-node deployments. + #[serde(skip_serializing_if = "Option::is_none")] + node_features: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -275,6 +294,71 @@ struct BoundingBox { height: f64, } +/// Per-node sensing state for multi-node deployments (issue #249). +/// Each ESP32 node gets its own frame history, smoothing buffers, and vital +/// sign detector so that data from different nodes is never mixed. +struct NodeState { + pub(crate) frame_history: VecDeque>, + smoothed_person_score: f64, + pub(crate) prev_person_count: usize, + smoothed_motion: f64, + current_motion_level: String, + debounce_counter: u32, + debounce_candidate: String, + baseline_motion: f64, + baseline_frames: u64, + smoothed_hr: f64, + smoothed_br: f64, + smoothed_hr_conf: f64, + smoothed_br_conf: f64, + hr_buffer: VecDeque, + br_buffer: VecDeque, + rssi_history: VecDeque, + vital_detector: VitalSignDetector, + latest_vitals: VitalSigns, + pub(crate) last_frame_time: Option, + edge_vitals: Option, +} + +impl NodeState { + pub(crate) fn new() -> Self { + Self { + frame_history: VecDeque::new(), + smoothed_person_score: 0.0, + prev_person_count: 0, + smoothed_motion: 0.0, + current_motion_level: "absent".to_string(), + debounce_counter: 0, + debounce_candidate: "absent".to_string(), + baseline_motion: 0.0, + baseline_frames: 0, + smoothed_hr: 0.0, + smoothed_br: 0.0, + smoothed_hr_conf: 0.0, + smoothed_br_conf: 0.0, + hr_buffer: VecDeque::with_capacity(8), + br_buffer: VecDeque::with_capacity(8), + rssi_history: VecDeque::new(), + vital_detector: VitalSignDetector::new(10.0), + latest_vitals: VitalSigns::default(), + last_frame_time: None, + edge_vitals: None, + } + } +} + +/// Per-node feature info for WebSocket broadcasts (multi-node support). +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PerNodeFeatureInfo { + node_id: u8, + features: FeatureInfo, + classification: ClassificationInfo, + rssi_dbm: f64, + last_seen_ms: u64, + frame_rate_hz: f64, + stale: bool, +} + /// Shared application state struct AppStateInner { latest_update: Option, @@ -285,6 +369,8 @@ struct AppStateInner { frame_history: VecDeque>, tick: u64, source: String, + /// Instant of the last ESP32 UDP frame received (for offline detection). + last_esp32_frame: Option, tx: broadcast::Sender, total_detections: u64, start_time: std::time::Instant, @@ -304,6 +390,8 @@ struct AppStateInner { model_loaded: bool, /// Smoothed person count (EMA) for hysteresis — prevents frame-to-frame jumping. smoothed_person_score: f64, + /// Previous person count for hysteresis (asymmetric up/down thresholds). + prev_person_count: usize, // ── Motion smoothing & adaptive baseline (ADR-047 tuning) ──────────── /// EMA-smoothed motion score (alpha ~0.15 for ~10 FPS → ~1s time constant). smoothed_motion: f64, @@ -360,6 +448,63 @@ struct AppStateInner { // ── Adaptive classifier (environment-tuned) ────────────────────────── /// Trained adaptive model (loaded from data/adaptive_model.json or trained at runtime). adaptive_model: Option, + // ── Per-node state (issue #249) ───────────────────────────────────── + /// Per-node sensing state for multi-node deployments. + /// Keyed by `node_id` from the ESP32 frame header. + node_states: HashMap, + // ── Accuracy sprint: Kalman tracker, multistatic fusion, eigenvalue counting ── + /// Global Kalman-based pose tracker for stable person IDs and smoothed keypoints. + pose_tracker: PoseTracker, + /// Instant of last tracker update (for computing dt). + last_tracker_instant: Option, + /// Attention-weighted multi-node CSI fusion engine. + multistatic_fuser: MultistaticFuser, + /// SVD-based room field model for eigenvalue person counting (None until calibration). + field_model: Option, +} + +/// If no ESP32 frame arrives within this duration, source reverts to offline. +const ESP32_OFFLINE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + +impl AppStateInner { + /// Return the effective data source, accounting for ESP32 frame timeout. + /// If the source is "esp32" but no frame has arrived in 5 seconds, returns + /// "esp32:offline" so the UI can distinguish active vs stale connections. + /// Person count: eigenvalue-based if field model is calibrated, else heuristic. + /// Uses global frame_history if populated, otherwise the freshest per-node history. + fn person_count(&self) -> usize { + match self.field_model.as_ref() { + Some(fm) => { + // Prefer global frame_history (populated by wifi/simulate paths). + // Fall back to freshest per-node history (populated by ESP32 paths). + let history = if !self.frame_history.is_empty() { + &self.frame_history + } else { + // Find the node with the most recent frame + self.node_states.values() + .filter(|ns| !ns.frame_history.is_empty()) + .max_by_key(|ns| ns.last_frame_time) + .map(|ns| &ns.frame_history) + .unwrap_or(&self.frame_history) + }; + field_bridge::occupancy_or_fallback( + fm, history, self.smoothed_person_score, self.prev_person_count, + ) + } + None => score_to_person_count(self.smoothed_person_score, self.prev_person_count), + } + } + + fn effective_source(&self) -> String { + if self.source == "esp32" { + if let Some(last) = self.last_esp32_frame { + if last.elapsed() > ESP32_OFFLINE_TIMEOUT { + return "esp32:offline".to_string(); + } + } + } + self.source.clone() + } } /// Number of frames retained in `frame_history` for temporal analysis. @@ -490,7 +635,9 @@ fn parse_esp32_frame(buf: &[u8]) -> Option { let n_subcarriers = buf[6]; let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]); let sequence = u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]); - let rssi = buf[14] as i8; + let rssi_raw = buf[14] as i8; + // Fix RSSI sign: ensure it's always negative (dBm convention). + let rssi = if rssi_raw > 0 { rssi_raw.saturating_neg() } else { rssi_raw }; let noise_floor = buf[15] as i8; let iq_start = 20; @@ -941,6 +1088,44 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo, raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0); } +/// Per-node variant of `smooth_and_classify` that operates on a `NodeState` +/// instead of `AppStateInner` (issue #249). +fn smooth_and_classify_node(ns: &mut NodeState, raw: &mut ClassificationInfo, raw_motion: f64) { + ns.baseline_frames += 1; + if ns.baseline_frames < BASELINE_WARMUP { + ns.baseline_motion = ns.baseline_motion * 0.9 + raw_motion * 0.1; + } else if raw_motion < ns.smoothed_motion + 0.05 { + ns.baseline_motion = ns.baseline_motion * (1.0 - BASELINE_EMA_ALPHA) + + raw_motion * BASELINE_EMA_ALPHA; + } + + let adjusted = (raw_motion - ns.baseline_motion * 0.7).max(0.0); + + ns.smoothed_motion = ns.smoothed_motion * (1.0 - MOTION_EMA_ALPHA) + + adjusted * MOTION_EMA_ALPHA; + let sm = ns.smoothed_motion; + + let candidate = raw_classify(sm); + + if candidate == ns.current_motion_level { + ns.debounce_counter = 0; + ns.debounce_candidate = candidate; + } else if candidate == ns.debounce_candidate { + ns.debounce_counter += 1; + if ns.debounce_counter >= DEBOUNCE_FRAMES { + ns.current_motion_level = candidate; + ns.debounce_counter = 0; + } + } else { + ns.debounce_candidate = candidate; + ns.debounce_counter = 1; + } + + raw.motion_level = ns.current_motion_level.clone(); + raw.presence = sm > 0.03; + raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0); +} + /// If an adaptive model is loaded, override the classification with the /// model's prediction. Uses the full 15-feature vector for higher accuracy. fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) { @@ -1041,6 +1226,55 @@ fn smooth_vitals(state: &mut AppStateInner, raw: &VitalSigns) -> VitalSigns { } } +/// Per-node variant of `smooth_vitals` that operates on a `NodeState` (issue #249). +fn smooth_vitals_node(ns: &mut NodeState, raw: &VitalSigns) -> VitalSigns { + let raw_hr = raw.heart_rate_bpm.unwrap_or(0.0); + let raw_br = raw.breathing_rate_bpm.unwrap_or(0.0); + + let hr_ok = ns.smoothed_hr < 1.0 || (raw_hr - ns.smoothed_hr).abs() < HR_MAX_JUMP; + let br_ok = ns.smoothed_br < 1.0 || (raw_br - ns.smoothed_br).abs() < BR_MAX_JUMP; + + if hr_ok && raw_hr > 0.0 { + ns.hr_buffer.push_back(raw_hr); + if ns.hr_buffer.len() > VITAL_MEDIAN_WINDOW { ns.hr_buffer.pop_front(); } + } + if br_ok && raw_br > 0.0 { + ns.br_buffer.push_back(raw_br); + if ns.br_buffer.len() > VITAL_MEDIAN_WINDOW { ns.br_buffer.pop_front(); } + } + + let trimmed_hr = trimmed_mean(&ns.hr_buffer); + let trimmed_br = trimmed_mean(&ns.br_buffer); + + if trimmed_hr > 0.0 { + if ns.smoothed_hr < 1.0 { + ns.smoothed_hr = trimmed_hr; + } else if (trimmed_hr - ns.smoothed_hr).abs() > HR_DEAD_BAND { + ns.smoothed_hr = ns.smoothed_hr * (1.0 - VITAL_EMA_ALPHA) + + trimmed_hr * VITAL_EMA_ALPHA; + } + } + if trimmed_br > 0.0 { + if ns.smoothed_br < 1.0 { + ns.smoothed_br = trimmed_br; + } else if (trimmed_br - ns.smoothed_br).abs() > BR_DEAD_BAND { + ns.smoothed_br = ns.smoothed_br * (1.0 - VITAL_EMA_ALPHA) + + trimmed_br * VITAL_EMA_ALPHA; + } + } + + ns.smoothed_hr_conf = ns.smoothed_hr_conf * 0.92 + raw.heartbeat_confidence * 0.08; + ns.smoothed_br_conf = ns.smoothed_br_conf * 0.92 + raw.breathing_confidence * 0.08; + + VitalSigns { + breathing_rate_bpm: if ns.smoothed_br > 1.0 { Some(ns.smoothed_br) } else { None }, + heart_rate_bpm: if ns.smoothed_hr > 1.0 { Some(ns.smoothed_hr) } else { None }, + breathing_confidence: ns.smoothed_br_conf, + heartbeat_confidence: ns.smoothed_hr_conf, + signal_quality: raw.signal_quality, + } +} + /// Trimmed mean: sort, drop top/bottom 25%, average the middle 50%. /// More robust than median (uses more data) and less noisy than raw mean. fn trimmed_mean(buf: &VecDeque) -> f64 { @@ -1247,12 +1481,15 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { let feat_variance = features.variance; - // Multi-person estimation with temporal smoothing (EMA α=0.15). + // Multi-person estimation with temporal smoothing (EMA α=0.10). let raw_score = compute_person_score(&features); - s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15; + s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { - score_to_person_count(s.smoothed_person_score) + let count = s.person_count(); + s.prev_person_count = count; + count } else { + s.prev_person_count = 0; 0 }; @@ -1285,12 +1522,16 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; - // Populate persons from the sensing update. - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + // Populate persons from the sensing update (Kalman-smoothed via tracker). + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -1377,12 +1618,15 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { let feat_variance = features.variance; - // Multi-person estimation with temporal smoothing. + // Multi-person estimation with temporal smoothing (EMA α=0.10). let raw_score = compute_person_score(&features); - s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15; + s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { - score_to_person_count(s.smoothed_person_score) + let count = s.person_count(); + s.prev_person_count = count; + count } else { + s.prev_person_count = 0; 0 }; @@ -1415,11 +1659,15 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -1596,9 +1844,13 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) { keypoints, zone: "zone_1".into(), }] - }).unwrap_or_else(|| derive_pose_from_sensing(&sensing)) + }).unwrap_or_else(|| { + // Prefer tracked persons from broadcast if available + sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing)) + }) } else { - derive_pose_from_sensing(&sensing) + // Prefer tracked persons from broadcast if available + sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing)) }; let pose_msg = serde_json::json!({ @@ -1661,7 +1913,7 @@ async fn health(State(state): State) -> Json { let s = state.read().await; Json(serde_json::json!({ "status": "ok", - "source": s.source, + "source": s.effective_source(), "tick": s.tick, "clients": s.tx.receiver_count(), })) @@ -1724,18 +1976,45 @@ fn compute_person_score(feat: &FeatureInfo) -> f64 { /// Convert smoothed person score to discrete count with hysteresis. /// -/// Uses asymmetric thresholds: higher threshold to add a person, lower to remove. -/// This prevents flickering at the boundary. -fn score_to_person_count(smoothed_score: f64) -> usize { - // Thresholds chosen conservatively for single-ESP32 link: - // score > 0.50 → 2 persons (needs sustained high variance + change points) - // score > 0.80 → 3 persons (very high activity, rare with single link) - if smoothed_score > 0.80 { - 3 - } else if smoothed_score > 0.50 { - 2 - } else { - 1 +/// Uses asymmetric thresholds: higher threshold to *add* a person, lower to +/// *drop* one. This prevents flickering when the score hovers near a boundary +/// (the #1 user-reported issue — see #237, #249, #280, #292). +fn score_to_person_count(smoothed_score: f64, prev_count: usize) -> usize { + // Up-thresholds (must exceed to increase count): + // 1→2: 0.65 (raised from 0.50 — multipath in small rooms hit 0.50 easily) + // 2→3: 0.85 (raised from 0.80 — 3 persons needs strong sustained signal) + // Down-thresholds (must drop below to decrease count): + // 2→1: 0.45 (hysteresis gap of 0.20) + // 3→2: 0.70 (hysteresis gap of 0.15) + match prev_count { + 0 | 1 => { + if smoothed_score > 0.85 { + 3 + } else if smoothed_score > 0.65 { + 2 + } else { + 1 + } + } + 2 => { + if smoothed_score > 0.85 { + 3 + } else if smoothed_score < 0.45 { + 1 + } else { + 2 // hold — within hysteresis band + } + } + _ => { + // prev_count >= 3 + if smoothed_score < 0.45 { + 1 + } else if smoothed_score < 0.70 { + 2 + } else { + 3 // hold + } + } } } @@ -1942,7 +2221,7 @@ async fn health_ready(State(state): State) -> Json) -> Json 0 { "healthy" } else { "idle" }, "message": format!("{} client(s)", s.tx.receiver_count()) }, @@ -1993,7 +2275,7 @@ async fn api_info(State(state): State) -> Json { "version": env!("CARGO_PKG_VERSION"), "environment": "production", "backend": "rust", - "source": s.source, + "source": s.effective_source(), "features": { "wifi_sensing": true, "pose_estimation": true, @@ -2007,14 +2289,14 @@ async fn api_info(State(state): State) -> Json { async fn pose_current(State(state): State) -> Json { let s = state.read().await; let persons = match &s.latest_update { - Some(update) => derive_pose_from_sensing(update), + Some(update) => update.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(update)), None => vec![], }; Json(serde_json::json!({ "timestamp": chrono::Utc::now().timestamp_millis() as f64 / 1000.0, "persons": persons, "total_persons": persons.len(), - "source": s.source, + "source": s.effective_source(), })) } @@ -2024,7 +2306,7 @@ async fn pose_stats(State(state): State) -> Json "total_detections": s.total_detections, "average_confidence": 0.87, "frames_processed": s.tick, - "source": s.source, + "source": s.effective_source(), })) } @@ -2048,7 +2330,7 @@ async fn stream_status(State(state): State) -> Json 1 { 10u64 } else { 0u64 }, - "source": s.source, + "source": s.effective_source(), })) } @@ -2558,6 +2840,88 @@ async fn adaptive_unload(State(state): State) -> Json) -> Json { + let mut s = state.write().await; + // Guard: don't discard an in-progress or fresh calibration + if let Some(ref fm) = s.field_model { + match fm.status() { + CalibrationStatus::Collecting => { + return Json(serde_json::json!({ + "success": false, + "error": "Calibration already in progress. Call /calibration/stop first.", + "frame_count": fm.calibration_frame_count(), + })); + } + CalibrationStatus::Fresh => { + return Json(serde_json::json!({ + "success": false, + "error": "A fresh calibration already exists. Call /calibration/stop or wait for expiry.", + })); + } + _ => {} // Stale/Expired/Uncalibrated — ok to recalibrate + } + } + match FieldModel::new(field_bridge::single_link_config()) { + Ok(fm) => { + s.field_model = Some(fm); + Json(serde_json::json!({ + "success": true, + "message": "Calibration started — keep room empty while frames accumulate.", + })) + } + Err(e) => Json(serde_json::json!({ + "success": false, + "error": format!("{e}"), + })), + } +} + +async fn calibration_stop(State(state): State) -> Json { + let mut s = state.write().await; + if let Some(ref mut fm) = s.field_model { + let ts = chrono::Utc::now().timestamp_micros() as u64; + match fm.finalize_calibration(ts, 0) { + Ok(modes) => { + let baseline = modes.baseline_eigenvalue_count; + let variance_explained = modes.variance_explained; + info!("Field model calibrated: baseline_eigenvalues={baseline}, variance_explained={variance_explained:.2}"); + Json(serde_json::json!({ + "success": true, + "baseline_eigenvalue_count": baseline, + "variance_explained": variance_explained, + "frame_count": fm.calibration_frame_count(), + })) + } + Err(e) => Json(serde_json::json!({ + "success": false, + "error": format!("{e}"), + })), + } + } else { + Json(serde_json::json!({ + "success": false, + "error": "No field model active — call /calibration/start first.", + })) + } +} + +async fn calibration_status(State(state): State) -> Json { + let s = state.read().await; + match s.field_model.as_ref() { + Some(fm) => Json(serde_json::json!({ + "active": true, + "status": format!("{:?}", fm.status()), + "frame_count": fm.calibration_frame_count(), + })), + None => Json(serde_json::json!({ + "active": false, + "status": "none", + })), + } +} + /// Generate a simple timestamp string (epoch seconds) for recording IDs. fn chrono_timestamp() -> u64 { std::time::SystemTime::now() @@ -2584,7 +2948,7 @@ async fn vital_signs_endpoint(State(state): State) -> Json) -> Json { + let s = state.read().await; + let now = std::time::Instant::now(); + let nodes: Vec = s.node_states.iter() + .map(|(&id, ns)| { + let elapsed_ms = ns.last_frame_time + .map(|t| now.duration_since(t).as_millis() as u64) + .unwrap_or(999999); + let stale = elapsed_ms > 5000; + let status = if stale { "stale" } else { "active" }; + let rssi = ns.rssi_history.back().copied().unwrap_or(-90.0); + serde_json::json!({ + "node_id": id, + "status": status, + "last_seen_ms": elapsed_ms, + "rssi_dbm": rssi, + "motion_level": &ns.current_motion_level, + "person_count": ns.prev_person_count, + }) + }) + .collect(); + Json(serde_json::json!({ + "nodes": nodes, + "total": nodes.len(), + })) +} + async fn info_page() -> Html { Html(format!( "\ @@ -2761,6 +3153,140 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { })) { let _ = s.tx.send(json); } + + // Issue #323: Also emit a sensing_update so the UI renders + // detections for ESP32 nodes running the edge DSP pipeline + // (Tier 2+). Without this, vitals arrive but the UI shows + // "no detection" because it only renders sensing_update msgs. + s.source = "esp32".to_string(); + s.last_esp32_frame = Some(std::time::Instant::now()); + + // ── Per-node state for edge vitals (issue #249) ────── + let node_id = vitals.node_id; + let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new); + ns.last_frame_time = Some(std::time::Instant::now()); + ns.edge_vitals = Some(vitals.clone()); + ns.rssi_history.push_back(vitals.rssi as f64); + if ns.rssi_history.len() > 60 { ns.rssi_history.pop_front(); } + + // Store per-node person count from edge vitals. + let node_est = if vitals.presence { + (vitals.n_persons as usize).max(1) + } else { + 0 + }; + ns.prev_person_count = node_est; + + s.tick += 1; + let tick = s.tick; + + let motion_level = if vitals.motion { "present_moving" } + else if vitals.presence { "present_still" } + else { "absent" }; + let motion_score = if vitals.motion { 0.8 } + else if vitals.presence { 0.3 } + else { 0.05 }; + + // Aggregate person count: gate on presence first (matching WiFi path). + let now = std::time::Instant::now(); + let total_persons = if vitals.presence { + let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback( + &s.multistatic_fuser, &s.node_states, + ); + match fused { + Some(ref f) => { + let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude); + s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10; + let count = s.person_count(); + s.prev_person_count = count; + count.max(1) // presence=true => at least 1 + } + None => fallback_count.unwrap_or(0).max(1), + } + } else { + s.prev_person_count = 0; + 0 + }; + + // Feed field model calibration if active (use per-node history for ESP32). + if let Some(ref mut fm) = s.field_model { + if let Some(ns) = s.node_states.get(&node_id) { + field_bridge::maybe_feed_calibration(fm, &ns.frame_history); + } + } + + // Build nodes array with all active nodes. + let active_nodes: Vec = s.node_states.iter() + .filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) + .map(|(&id, n)| NodeInfo { + node_id: id, + rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0), + position: [2.0, 0.0, 1.5], + amplitude: vec![], + subcarrier_count: 0, + }) + .collect(); + + let features = FeatureInfo { + mean_rssi: vitals.rssi as f64, + variance: vitals.motion_energy as f64, + motion_band_power: vitals.motion_energy as f64, + breathing_band_power: if vitals.presence { 0.5 } else { 0.0 }, + dominant_freq_hz: vitals.breathing_rate_bpm / 60.0, + change_points: 0, + spectral_power: vitals.motion_energy as f64, + }; + let classification = ClassificationInfo { + motion_level: motion_level.to_string(), + presence: vitals.presence, + confidence: vitals.presence_score as f64, + }; + let signal_field = generate_signal_field( + vitals.rssi as f64, motion_score, vitals.breathing_rate_bpm / 60.0, + (vitals.presence_score as f64).min(1.0), &[], + ); + + let mut update = SensingUpdate { + msg_type: "sensing_update".to_string(), + timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0, + source: "esp32".to_string(), + tick, + nodes: active_nodes, + features: features.clone(), + classification, + signal_field, + vital_signs: Some(VitalSigns { + breathing_rate_bpm: if vitals.breathing_rate_bpm > 0.0 { Some(vitals.breathing_rate_bpm) } else { None }, + heart_rate_bpm: if vitals.heartrate_bpm > 0.0 { Some(vitals.heartrate_bpm) } else { None }, + breathing_confidence: if vitals.presence { 0.7 } else { 0.0 }, + heartbeat_confidence: if vitals.presence { 0.7 } else { 0.0 }, + signal_quality: vitals.presence_score as f64, + }), + enhanced_motion: None, + enhanced_breathing: None, + posture: None, + signal_quality_score: None, + quality_verdict: None, + bssid_count: None, + pose_keypoints: None, + model_status: None, + persons: None, + estimated_persons: if total_persons > 0 { Some(total_persons) } else { None }, + node_features: None, + }; + + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); + } + + if let Ok(json) = serde_json::to_string(&update) { + let _ = s.tx.send(json); + } + s.latest_update = Some(update); s.edge_vitals = Some(vitals); continue; } @@ -2790,25 +3316,92 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { let mut s = state.write().await; s.source = "esp32".to_string(); + s.last_esp32_frame = Some(std::time::Instant::now()); - // Append current amplitudes to history before extracting features so - // that temporal analysis includes the most recent frame. + // Also maintain global frame_history for backward compat + // (simulation path, REST endpoints, etc.). s.frame_history.push_back(frame.amplitudes.clone()); if s.frame_history.len() > FRAME_HISTORY_CAPACITY { s.frame_history.pop_front(); } - let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come + // ── Per-node processing (issue #249) ────────────────── + // Process entirely within per-node state so different + // ESP32 nodes never mix their smoothing/vitals buffers. + // We scope the mutable borrow of node_states so we can + // access other AppStateInner fields afterward. + let node_id = frame.node_id; + let adaptive_model_ref = s.adaptive_model.as_ref().map(|m| m as *const _); + let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new); + ns.last_frame_time = Some(std::time::Instant::now()); + + ns.frame_history.push_back(frame.amplitudes.clone()); + if ns.frame_history.len() > FRAME_HISTORY_CAPACITY { + ns.frame_history.pop_front(); + } + + let sample_rate_hz = 1000.0 / 500.0_f64; let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) = - extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz); - smooth_and_classify(&mut s, &mut classification, raw_motion); - adaptive_override(&s, &features, &mut classification); + extract_features_from_frame(&frame, &ns.frame_history, sample_rate_hz); + smooth_and_classify_node(ns, &mut classification, raw_motion); + + // SAFETY: adaptive_model_ref points into s which we hold + // via write lock; the model is not mutated here. We use a + // raw pointer to break the borrow-checker deadlock between + // node_states and adaptive_model (both inside s). + if let Some(model_ptr) = adaptive_model_ref { + let model: &adaptive_classifier::AdaptiveModel = unsafe { &*model_ptr }; + let amps = ns.frame_history.back() + .map(|v| v.as_slice()) + .unwrap_or(&[]); + let feat_arr = adaptive_classifier::features_from_runtime( + &serde_json::json!({ + "variance": features.variance, + "motion_band_power": features.motion_band_power, + "breathing_band_power": features.breathing_band_power, + "spectral_power": features.spectral_power, + "dominant_freq_hz": features.dominant_freq_hz, + "change_points": features.change_points, + "mean_rssi": features.mean_rssi, + }), + amps, + ); + let (label, conf) = model.classify(&feat_arr); + classification.motion_level = label.to_string(); + classification.presence = label != "absent"; + classification.confidence = (conf * 0.7 + classification.confidence * 0.3).clamp(0.0, 1.0); + } + + ns.rssi_history.push_back(features.mean_rssi); + if ns.rssi_history.len() > 60 { + ns.rssi_history.pop_front(); + } + + let raw_vitals = ns.vital_detector.process_frame( + &frame.amplitudes, + &frame.phases, + ); + let vitals = smooth_vitals_node(ns, &raw_vitals); + ns.latest_vitals = vitals.clone(); + + let raw_score = compute_person_score(&features); + ns.smoothed_person_score = ns.smoothed_person_score * 0.90 + raw_score * 0.10; + if classification.presence { + let count = score_to_person_count(ns.smoothed_person_score, ns.prev_person_count); + ns.prev_person_count = count; + } else { + ns.prev_person_count = 0; + } + + // Done with per-node mutable borrow; now read aggregated + // state from all nodes (the borrow of `ns` ends here). + // (We re-borrow node_states immutably via `s` below.) - // Update RSSI history s.rssi_history.push_back(features.mean_rssi); if s.rssi_history.len() > 60 { s.rssi_history.pop_front(); } + s.latest_vitals = vitals.clone(); s.tick += 1; let tick = s.tick; @@ -2817,34 +3410,54 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { else if classification.motion_level == "present_still" { 0.3 } else { 0.05 }; - let raw_vitals = s.vital_detector.process_frame( - &frame.amplitudes, - &frame.phases, - ); - let vitals = smooth_vitals(&mut s, &raw_vitals); - s.latest_vitals = vitals.clone(); - - // Multi-person estimation with temporal smoothing. - let raw_score = compute_person_score(&features); - s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15; - let est_persons = if classification.presence { - score_to_person_count(s.smoothed_person_score) + // Aggregate person count: gate on presence first (matching WiFi path). + let now = std::time::Instant::now(); + let total_persons = if classification.presence { + let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback( + &s.multistatic_fuser, &s.node_states, + ); + match fused { + Some(ref f) => { + let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude); + s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10; + let count = s.person_count(); + s.prev_person_count = count; + count.max(1) + } + None => fallback_count.unwrap_or(0).max(1), + } } else { + s.prev_person_count = 0; 0 }; + // Feed field model calibration if active (use per-node history for ESP32). + if let Some(ref mut fm) = s.field_model { + if let Some(ns) = s.node_states.get(&node_id) { + field_bridge::maybe_feed_calibration(fm, &ns.frame_history); + } + } + + // Build nodes array with all active nodes. + let active_nodes: Vec = s.node_states.iter() + .filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) + .map(|(&id, n)| NodeInfo { + node_id: id, + rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0), + position: [2.0, 0.0, 1.5], + amplitude: n.frame_history.back() + .map(|a| a.iter().take(56).cloned().collect()) + .unwrap_or_default(), + subcarrier_count: n.frame_history.back().map_or(0, |a| a.len()), + }) + .collect(); + let mut update = SensingUpdate { msg_type: "sensing_update".to_string(), timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0, source: "esp32".to_string(), tick, - nodes: vec![NodeInfo { - node_id: frame.node_id, - rssi_dbm: features.mean_rssi, - position: [2.0, 0.0, 1.5], - amplitude: frame.amplitudes.iter().take(56).cloned().collect(), - subcarrier_count: frame.n_subcarriers as usize, - }], + nodes: active_nodes, features: features.clone(), classification, signal_field: generate_signal_field( @@ -2861,12 +3474,16 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { pose_keypoints: None, model_status: None, persons: None, - estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + estimated_persons: if total_persons > 0 { Some(total_persons) } else { None }, + node_features: None, }; - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -2929,12 +3546,15 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { let frame_amplitudes = frame.amplitudes.clone(); let frame_n_sub = frame.n_subcarriers; - // Multi-person estimation with temporal smoothing. + // Multi-person estimation with temporal smoothing (EMA α=0.10). let raw_score = compute_person_score(&features); - s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15; + s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { - score_to_person_count(s.smoothed_person_score) + let count = s.person_count(); + s.prev_person_count = count; + count } else { + s.prev_person_count = 0; 0 }; @@ -2977,12 +3597,16 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { }, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; - // Populate persons from the sensing update. - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + // Populate persons from the sensing update (Kalman-smoothed via tracker). + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if update.classification.presence { @@ -3566,6 +4190,7 @@ async fn main() { frame_history: VecDeque::new(), tick: 0, source: source.into(), + last_esp32_frame: None, tx, total_detections: 0, start_time: std::time::Instant::now(), @@ -3577,6 +4202,7 @@ async fn main() { active_sona_profile: None, model_loaded, smoothed_person_score: 0.0, + prev_person_count: 0, smoothed_motion: 0.0, current_motion_level: "absent".to_string(), debounce_counter: 0, @@ -3608,6 +4234,30 @@ async fn main() { m.trained_frames, m.training_accuracy * 100.0); m }), + node_states: HashMap::new(), + // Accuracy sprint + pose_tracker: PoseTracker::new(), + last_tracker_instant: None, + multistatic_fuser: { + let mut fuser = MultistaticFuser::with_config(MultistaticConfig { + min_nodes: 1, // single-node passthrough + ..Default::default() + }); + if let Some(ref pos_str) = args.node_positions { + let positions = field_bridge::parse_node_positions(pos_str); + if !positions.is_empty() { + info!("Configured {} node positions for multistatic fusion", positions.len()); + fuser.set_node_positions(positions); + } + } + fuser + }, + field_model: if args.calibrate { + info!("Field model calibration enabled — room should be empty during startup"); + FieldModel::new(field_bridge::single_link_config()).ok() + } else { + None + }, })); // Start background tasks based on source @@ -3661,6 +4311,8 @@ async fn main() { .route("/api/v1/metrics", get(health_metrics)) // Sensing endpoints .route("/api/v1/sensing/latest", get(latest)) + // Per-node health endpoint + .route("/api/v1/nodes", get(nodes_endpoint)) // Vital sign endpoints .route("/api/v1/vital-signs", get(vital_signs_endpoint)) .route("/api/v1/edge-vitals", get(edge_vitals_endpoint)) @@ -3702,6 +4354,10 @@ async fn main() { .route("/api/v1/adaptive/train", post(adaptive_train)) .route("/api/v1/adaptive/status", get(adaptive_status)) .route("/api/v1/adaptive/unload", post(adaptive_unload)) + // Field model calibration (eigenvalue-based person counting) + .route("/api/v1/calibration/start", post(calibration_start)) + .route("/api/v1/calibration/stop", post(calibration_stop)) + .route("/api/v1/calibration/status", get(calibration_status)) // Static UI files .nest_service("/ui", ServeDir::new(&ui_path)) .layer(SetResponseHeaderLayer::overriding( @@ -3739,7 +4395,7 @@ async fn main() { "WiFi DensePose sensing model state", ); builder.add_metadata(&serde_json::json!({ - "source": s.source, + "source": s.effective_source(), "total_ticks": s.tick, "total_detections": s.total_detections, "uptime_secs": s.start_time.elapsed().as_secs(), diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs new file mode 100644 index 000000000..794b15bc7 --- /dev/null +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs @@ -0,0 +1,264 @@ +//! Bridge between sensing-server per-node state and the signal crate's +//! `MultistaticFuser` for attention-weighted CSI fusion across ESP32 nodes. +//! +//! This module converts the server's `NodeState` (f64 amplitude history) into +//! `MultiBandCsiFrame`s that the multistatic fusion pipeline expects, then +//! drives `MultistaticFuser::fuse` with a graceful fallback when fusion fails +//! (e.g. insufficient nodes or timestamp spread). + +use std::collections::HashMap; +use std::sync::LazyLock; +use std::time::{Duration, Instant}; + +use wifi_densepose_signal::hardware_norm::{CanonicalCsiFrame, HardwareType}; +use wifi_densepose_signal::ruvsense::multiband::MultiBandCsiFrame; +use wifi_densepose_signal::ruvsense::multistatic::{FusedSensingFrame, MultistaticFuser}; + +use super::NodeState; + +/// Maximum age for a node frame to be considered active (10 seconds). +const STALE_THRESHOLD: Duration = Duration::from_secs(10); + +/// Default WiFi channel frequency (MHz) used for single-channel frames. +const DEFAULT_FREQ_MHZ: u32 = 2437; // Channel 6 + +/// Monotonic reference point for timestamp generation. All node timestamps +/// are relative to this instant, avoiding wall-clock/monotonic mixing issues. +static EPOCH: LazyLock = LazyLock::new(Instant::now); + +/// Convert a single `NodeState` into a `MultiBandCsiFrame` suitable for +/// multistatic fusion. +/// +/// Returns `None` when the node has no frame history or no recorded +/// `last_frame_time`. +pub fn node_frame_from_state(node_id: u8, ns: &NodeState) -> Option { + let last_time = ns.last_frame_time.as_ref()?; + let latest = ns.frame_history.back()?; + if latest.is_empty() { + return None; + } + + let amplitude: Vec = latest.iter().map(|&v| v as f32).collect(); + let n_sub = amplitude.len(); + let phase = vec![0.0_f32; n_sub]; + + // Monotonic timestamp: microseconds since a shared process-local epoch. + // All nodes use the same reference so the fuser's guard_interval_us check + // compares apples to apples. No wall-clock mixing (immune to NTP jumps). + let timestamp_us = last_time.duration_since(*EPOCH).as_micros() as u64; + + let canonical = CanonicalCsiFrame { + amplitude, + phase, + hardware_type: HardwareType::Esp32S3, + }; + + Some(MultiBandCsiFrame { + node_id, + timestamp_us, + channel_frames: vec![canonical], + frequencies_mhz: vec![DEFAULT_FREQ_MHZ], + coherence: 1.0, // single-channel, perfect self-coherence + }) +} + +/// Collect `MultiBandCsiFrame`s from all active nodes. +/// +/// A node is considered active if its `last_frame_time` is within +/// [`STALE_THRESHOLD`] of `now`. +pub fn node_frames_from_states(node_states: &HashMap) -> Vec { + let now = Instant::now(); + let mut frames = Vec::with_capacity(node_states.len()); + + for (&node_id, ns) in node_states { + // Skip stale nodes + if let Some(ref t) = ns.last_frame_time { + if now.duration_since(*t) > STALE_THRESHOLD { + continue; + } + } else { + continue; + } + + if let Some(frame) = node_frame_from_state(node_id, ns) { + frames.push(frame); + } + } + + frames +} + +/// Attempt multistatic fusion; fall back to max per-node person count on failure. +/// +/// Returns `(fused_frame, fallback_person_count)`. When fusion succeeds, +/// `fallback_person_count` is `None` — the caller must compute count from +/// the fused amplitudes. On failure, returns the maximum per-node count +/// (not the sum, to avoid double-counting overlapping coverage). +pub fn fuse_or_fallback( + fuser: &MultistaticFuser, + node_states: &HashMap, +) -> (Option, Option) { + let frames = node_frames_from_states(node_states); + if frames.is_empty() { + return (None, Some(0)); + } + + match fuser.fuse(&frames) { + Ok(fused) => { + // Caller must compute person count from fused amplitudes. + (Some(fused), None) + } + Err(e) => { + tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback"); + // Use max (not sum) to avoid double-counting when nodes have overlapping coverage. + let max_count: usize = node_states + .values() + .filter(|ns| { + ns.last_frame_time + .map(|t| t.elapsed() <= STALE_THRESHOLD) + .unwrap_or(false) + }) + .map(|ns| ns.prev_person_count) + .max() + .unwrap_or(0); + (None, Some(max_count)) + } + } +} + +/// Compute a person-presence score from fused amplitude data. +/// +/// Uses the squared coefficient of variation (variance / mean^2) as a +/// lightweight proxy for body-induced CSI perturbation. A flat amplitude +/// vector (no person) yields a score near zero; a vector with high variance +/// relative to its mean (person moving) yields a score approaching 1.0. +pub fn compute_person_score_from_amplitudes(amplitudes: &[f32]) -> f64 { + if amplitudes.is_empty() { + return 0.0; + } + + let n = amplitudes.len() as f64; + let sum: f64 = amplitudes.iter().map(|&a| a as f64).sum(); + let mean = sum / n; + + let variance: f64 = amplitudes.iter().map(|&a| { + let diff = (a as f64) - mean; + diff * diff + }).sum::() / n; + + let score = variance / (mean * mean + 1e-10); + score.clamp(0.0, 1.0) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::VecDeque; + + /// Helper: build a minimal NodeState for testing. Uses `NodeState::new()` + /// then mutates the `pub(crate)` fields the bridge needs. + fn make_node_state( + frame_history: VecDeque>, + last_frame_time: Option, + prev_person_count: usize, + ) -> NodeState { + let mut ns = NodeState::new(); + ns.frame_history = frame_history; + ns.last_frame_time = last_frame_time; + ns.prev_person_count = prev_person_count; + ns + } + + #[test] + fn test_node_frame_from_empty_state() { + let ns = make_node_state(VecDeque::new(), Some(Instant::now()), 0); + assert!(node_frame_from_state(1, &ns).is_none()); + } + + #[test] + fn test_node_frame_from_state_no_time() { + let mut history = VecDeque::new(); + history.push_back(vec![1.0, 2.0, 3.0]); + let ns = make_node_state(history, None, 0); + assert!(node_frame_from_state(1, &ns).is_none()); + } + + #[test] + fn test_node_frame_conversion() { + let mut history = VecDeque::new(); + history.push_back(vec![10.0, 20.0, 30.5]); + let ns = make_node_state(history, Some(Instant::now()), 0); + + let frame = node_frame_from_state(42, &ns).expect("should produce a frame"); + assert_eq!(frame.node_id, 42); + assert_eq!(frame.channel_frames.len(), 1); + + let ch = &frame.channel_frames[0]; + assert_eq!(ch.amplitude.len(), 3); + assert!((ch.amplitude[0] - 10.0_f32).abs() < f32::EPSILON); + assert!((ch.amplitude[1] - 20.0_f32).abs() < f32::EPSILON); + assert!((ch.amplitude[2] - 30.5_f32).abs() < f32::EPSILON); + // Phase should be all zeros + assert!(ch.phase.iter().all(|&p| p == 0.0)); + assert_eq!(ch.hardware_type, HardwareType::Esp32S3); + } + + #[test] + fn test_stale_node_excluded() { + let mut states: HashMap = HashMap::new(); + + // Active node: frame just received + let mut active_history = VecDeque::new(); + active_history.push_back(vec![1.0, 2.0]); + states.insert(1, make_node_state(active_history, Some(Instant::now()), 1)); + + // Stale node: frame 20 seconds ago + let mut stale_history = VecDeque::new(); + stale_history.push_back(vec![3.0, 4.0]); + let stale_time = Instant::now() - Duration::from_secs(20); + states.insert(2, make_node_state(stale_history, Some(stale_time), 1)); + + let frames = node_frames_from_states(&states); + assert_eq!(frames.len(), 1, "stale node should be excluded"); + assert_eq!(frames[0].node_id, 1); + } + + #[test] + fn test_compute_person_score_empty() { + assert!((compute_person_score_from_amplitudes(&[]) - 0.0).abs() < f64::EPSILON); + } + + #[test] + fn test_compute_person_score_flat() { + // Constant amplitude => variance = 0 => score ~ 0 + let flat = vec![5.0_f32; 64]; + let score = compute_person_score_from_amplitudes(&flat); + assert!(score < 0.001, "flat signal should have near-zero score, got {score}"); + } + + #[test] + fn test_compute_person_score_varied() { + // High variance relative to mean should produce a positive score + let varied: Vec = (0..64).map(|i| if i % 2 == 0 { 1.0 } else { 10.0 }).collect(); + let score = compute_person_score_from_amplitudes(&varied); + assert!(score > 0.1, "varied signal should have positive score, got {score}"); + assert!(score <= 1.0, "score should be clamped to 1.0, got {score}"); + } + + #[test] + fn test_compute_person_score_clamped() { + // Near-zero mean with non-zero variance => would blow up without clamp + let vals = vec![0.0_f32, 0.0, 0.0, 0.001]; + let score = compute_person_score_from_amplitudes(&vals); + assert!(score <= 1.0, "score must be clamped to 1.0"); + } + + #[test] + fn test_fuse_or_fallback_empty() { + let fuser = MultistaticFuser::new(); + let states: HashMap = HashMap::new(); + let (fused, count) = fuse_or_fallback(&fuser, &states); + assert!(fused.is_none()); + assert_eq!(count, Some(0)); + } +} diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs new file mode 100644 index 000000000..b66d0fcf4 --- /dev/null +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs @@ -0,0 +1,409 @@ +//! Bridge between sensing-server PersonDetection types and signal crate PoseTracker. +//! +//! The sensing server uses f64 types (PersonDetection, PoseKeypoint, BoundingBox) +//! while the signal crate's PoseTracker operates on f32 Kalman states. This module +//! provides conversion functions and a single `tracker_update` entry point that +//! accepts server-side detections and returns tracker-smoothed results. + +use std::time::Instant; +use wifi_densepose_signal::ruvsense::{ + self, KeypointState, PoseTrack, TrackLifecycleState, TrackId, NUM_KEYPOINTS, +}; +use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker; + +use super::{BoundingBox, PersonDetection, PoseKeypoint}; + +/// COCO-17 keypoint names in index order. +const COCO_NAMES: [&str; 17] = [ + "nose", + "left_eye", + "right_eye", + "left_ear", + "right_ear", + "left_shoulder", + "right_shoulder", + "left_elbow", + "right_elbow", + "left_wrist", + "right_wrist", + "left_hip", + "right_hip", + "left_knee", + "right_knee", + "left_ankle", + "right_ankle", +]; + +/// Map a lowercase keypoint name to its COCO-17 index. +fn keypoint_name_to_coco_index(name: &str) -> Option { + COCO_NAMES.iter().position(|&n| n.eq_ignore_ascii_case(name)) +} + +/// Convert server-side PersonDetection slices into tracker-compatible keypoint arrays. +/// +/// For each person, maps named keypoints to COCO-17 positions. Unmapped slots are +/// filled with the centroid of the mapped keypoints so the Kalman filter has a +/// reasonable initial value rather than zeros. +fn detections_to_tracker_keypoints(persons: &[PersonDetection]) -> Vec<[[f32; 3]; 17]> { + persons + .iter() + .map(|person| { + let mut kps = [[0.0_f32; 3]; 17]; + let mut mapped_count = 0u32; + let mut cx = 0.0_f32; + let mut cy = 0.0_f32; + let mut cz = 0.0_f32; + + // First pass: place mapped keypoints and accumulate centroid + for kp in &person.keypoints { + if let Some(idx) = keypoint_name_to_coco_index(&kp.name) { + kps[idx] = [kp.x as f32, kp.y as f32, kp.z as f32]; + cx += kp.x as f32; + cy += kp.y as f32; + cz += kp.z as f32; + mapped_count += 1; + } + } + + // Compute centroid of mapped keypoints + let centroid = if mapped_count > 0 { + let n = mapped_count as f32; + [cx / n, cy / n, cz / n] + } else { + [0.0, 0.0, 0.0] + }; + + // Second pass: fill unmapped slots with centroid + // Build a set of mapped indices + let mut mapped = [false; 17]; + for kp in &person.keypoints { + if let Some(idx) = keypoint_name_to_coco_index(&kp.name) { + mapped[idx] = true; + } + } + for i in 0..17 { + if !mapped[i] { + kps[i] = centroid; + } + } + + kps + }) + .collect() +} + +/// Convert active PoseTracker tracks back into server-side PersonDetection values. +/// +/// Only tracks whose lifecycle `is_alive()` are included. +pub fn tracker_to_person_detections(tracker: &PoseTracker) -> Vec { + tracker + .active_tracks() + .into_iter() + .map(|track| { + let id = track.id.0 as u32; + + let confidence = match track.lifecycle { + TrackLifecycleState::Active => 0.9, + TrackLifecycleState::Tentative => 0.5, + TrackLifecycleState::Lost => 0.3, + TrackLifecycleState::Terminated => 0.0, + }; + + // Build keypoints from Kalman state + let keypoints: Vec = (0..NUM_KEYPOINTS) + .map(|i| { + let pos = track.keypoints[i].position(); + PoseKeypoint { + name: COCO_NAMES[i].to_string(), + x: pos[0] as f64, + y: pos[1] as f64, + z: pos[2] as f64, + confidence: track.keypoints[i].confidence as f64, + } + }) + .collect(); + + // Compute bounding box from observed keypoints only (confidence > 0). + // Unobserved slots (centroid-filled) collapse the bbox over time. + let mut min_x = f64::MAX; + let mut min_y = f64::MAX; + let mut max_x = f64::MIN; + let mut max_y = f64::MIN; + let mut observed = 0; + for kp in &keypoints { + if kp.confidence > 0.0 { + if kp.x < min_x { min_x = kp.x; } + if kp.y < min_y { min_y = kp.y; } + if kp.x > max_x { max_x = kp.x; } + if kp.y > max_y { max_y = kp.y; } + observed += 1; + } + } + + let bbox = if observed > 0 { + BoundingBox { + x: min_x, + y: min_y, + width: (max_x - min_x).max(0.01), + height: (max_y - min_y).max(0.01), + } + } else { + // No observed keypoints — use a default bbox at centroid + let cx = keypoints.iter().map(|k| k.x).sum::() / keypoints.len() as f64; + let cy = keypoints.iter().map(|k| k.y).sum::() / keypoints.len() as f64; + BoundingBox { x: cx - 0.3, y: cy - 0.5, width: 0.6, height: 1.0 } + }; + + PersonDetection { + id, + confidence, + keypoints, + bbox, + zone: "tracked".to_string(), + } + }) + .collect() +} + +/// Run one tracker cycle: predict, match detections, update, prune. +/// +/// This is the main entry point called each sensing frame. It: +/// 1. Computes dt from the previous call instant +/// 2. Predicts all existing tracks forward +/// 3. Greedily assigns detections to tracks by Mahalanobis cost +/// 4. Updates matched tracks, creates new tracks for unmatched detections +/// 5. Prunes terminated tracks +/// 6. Returns smoothed PersonDetection values from the tracker state +pub fn tracker_update( + tracker: &mut PoseTracker, + last_instant: &mut Option, + persons: Vec, +) -> Vec { + let now = Instant::now(); + let dt = last_instant.map_or(0.1_f32, |prev| now.duration_since(prev).as_secs_f32()); + *last_instant = Some(now); + + // Predict all tracks forward + tracker.predict_all(dt); + + if persons.is_empty() { + tracker.prune_terminated(); + return tracker_to_person_detections(tracker); + } + + // Convert detections to f32 keypoint arrays + let all_keypoints = detections_to_tracker_keypoints(&persons); + + // Compute centroids for each detection + let centroids: Vec<[f32; 3]> = all_keypoints + .iter() + .map(|kps| { + let mut c = [0.0_f32; 3]; + for kp in kps { + c[0] += kp[0]; + c[1] += kp[1]; + c[2] += kp[2]; + } + let n = NUM_KEYPOINTS as f32; + c[0] /= n; + c[1] /= n; + c[2] /= n; + c + }) + .collect(); + + // Greedy assignment: for each detection, find the best matching active track. + // Collect tracks once to avoid re-borrowing tracker per detection. + let active: Vec<(TrackId, [f32; 3])> = tracker.active_tracks().iter().map(|t| { + let centroid = { + let mut c = [0.0_f32; 3]; + for kp in &t.keypoints { + let p = kp.position(); + c[0] += p[0]; c[1] += p[1]; c[2] += p[2]; + } + let n = NUM_KEYPOINTS as f32; + [c[0] / n, c[1] / n, c[2] / n] + }; + (t.id, centroid) + }).collect(); + + let mut used_tracks: Vec = vec![false; active.len()]; + let mut matched: Vec> = vec![None; persons.len()]; + + for det_idx in 0..persons.len() { + let mut best_cost = f32::MAX; + let mut best_track_idx = None; + + let active_refs = tracker.active_tracks(); + for (track_idx, track) in active_refs.iter().enumerate() { + if used_tracks[track_idx] { + continue; + } + let cost = tracker.assignment_cost(track, ¢roids[det_idx], &[]); + if cost < best_cost { + best_cost = cost; + best_track_idx = Some(track_idx); + } + } + + // Mahalanobis gate: 9.0 (default TrackerConfig) + if best_cost < 9.0 { + if let Some(tidx) = best_track_idx { + matched[det_idx] = Some(active[tidx].0); + used_tracks[tidx] = true; + } + } + } + + // Timestamp for new/updated tracks (microseconds since UNIX epoch) + let timestamp_us = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_micros() as u64) + .unwrap_or(0); + + // Update matched tracks (uses update_keypoints for proper lifecycle transitions) + for (det_idx, track_id_opt) in matched.iter().enumerate() { + if let Some(track_id) = track_id_opt { + if let Some(track) = tracker.find_track_mut(*track_id) { + track.update_keypoints(&all_keypoints[det_idx], 0.08, 1.0, timestamp_us); + } + } + } + + // Create new tracks for unmatched detections + for (det_idx, track_id_opt) in matched.iter().enumerate() { + if track_id_opt.is_none() { + tracker.create_track(&all_keypoints[det_idx], timestamp_us); + } + } + + tracker.prune_terminated(); + tracker_to_person_detections(tracker) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_keypoint(name: &str, x: f64, y: f64, z: f64) -> PoseKeypoint { + PoseKeypoint { + name: name.to_string(), + x, + y, + z, + confidence: 0.9, + } + } + + fn make_person(id: u32, keypoints: Vec) -> PersonDetection { + PersonDetection { + id, + confidence: 0.8, + keypoints, + bbox: BoundingBox { + x: 0.0, + y: 0.0, + width: 1.0, + height: 1.0, + }, + zone: "test".to_string(), + } + } + + #[test] + fn test_keypoint_name_to_coco_index() { + assert_eq!(keypoint_name_to_coco_index("nose"), Some(0)); + assert_eq!(keypoint_name_to_coco_index("left_eye"), Some(1)); + assert_eq!(keypoint_name_to_coco_index("right_eye"), Some(2)); + assert_eq!(keypoint_name_to_coco_index("left_ear"), Some(3)); + assert_eq!(keypoint_name_to_coco_index("right_ear"), Some(4)); + assert_eq!(keypoint_name_to_coco_index("left_shoulder"), Some(5)); + assert_eq!(keypoint_name_to_coco_index("right_shoulder"), Some(6)); + assert_eq!(keypoint_name_to_coco_index("left_elbow"), Some(7)); + assert_eq!(keypoint_name_to_coco_index("right_elbow"), Some(8)); + assert_eq!(keypoint_name_to_coco_index("left_wrist"), Some(9)); + assert_eq!(keypoint_name_to_coco_index("right_wrist"), Some(10)); + assert_eq!(keypoint_name_to_coco_index("left_hip"), Some(11)); + assert_eq!(keypoint_name_to_coco_index("right_hip"), Some(12)); + assert_eq!(keypoint_name_to_coco_index("left_knee"), Some(13)); + assert_eq!(keypoint_name_to_coco_index("right_knee"), Some(14)); + assert_eq!(keypoint_name_to_coco_index("left_ankle"), Some(15)); + assert_eq!(keypoint_name_to_coco_index("right_ankle"), Some(16)); + assert_eq!(keypoint_name_to_coco_index("unknown"), None); + // Case insensitive + assert_eq!(keypoint_name_to_coco_index("NOSE"), Some(0)); + assert_eq!(keypoint_name_to_coco_index("Left_Eye"), Some(1)); + } + + #[test] + fn test_detections_to_tracker_keypoints() { + let person = make_person( + 1, + vec![ + make_keypoint("nose", 1.0, 2.0, 0.5), + make_keypoint("left_shoulder", 0.8, 2.5, 0.4), + make_keypoint("right_shoulder", 1.2, 2.5, 0.6), + ], + ); + + let result = detections_to_tracker_keypoints(&[person]); + assert_eq!(result.len(), 1); + + let kps = &result[0]; + + // Mapped keypoints should have correct values + assert!((kps[0][0] - 1.0).abs() < 1e-5); // nose x + assert!((kps[0][1] - 2.0).abs() < 1e-5); // nose y + assert!((kps[0][2] - 0.5).abs() < 1e-5); // nose z + + assert!((kps[5][0] - 0.8).abs() < 1e-5); // left_shoulder x + assert!((kps[6][0] - 1.2).abs() < 1e-5); // right_shoulder x + + // Unmapped keypoints should be at centroid of mapped keypoints + // centroid = ((1.0+0.8+1.2)/3, (2.0+2.5+2.5)/3, (0.5+0.4+0.6)/3) + let cx = (1.0 + 0.8 + 1.2) / 3.0; + let cy = (2.0 + 2.5 + 2.5) / 3.0; + let cz = (0.5 + 0.4 + 0.6) / 3.0; + + // left_eye (index 1) should be at centroid + assert!((kps[1][0] - cx).abs() < 1e-4); + assert!((kps[1][1] - cy).abs() < 1e-4); + assert!((kps[1][2] - cz).abs() < 1e-4); + } + + #[test] + fn test_tracker_update_stable_ids() { + let mut tracker = PoseTracker::new(); + let mut last_instant: Option = None; + + let person = make_person( + 0, + vec![ + make_keypoint("nose", 1.0, 2.0, 0.0), + make_keypoint("left_shoulder", 0.8, 2.5, 0.0), + make_keypoint("right_shoulder", 1.2, 2.5, 0.0), + make_keypoint("left_hip", 0.9, 3.5, 0.0), + make_keypoint("right_hip", 1.1, 3.5, 0.0), + ], + ); + + // First update: creates a new track + let result1 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]); + assert_eq!(result1.len(), 1); + let id1 = result1[0].id; + + // Second update: should match the existing track + let result2 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]); + assert_eq!(result2.len(), 1); + let id2 = result2[0].id; + + // Third update: same track ID should persist + let result3 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]); + assert_eq!(result3.len(), 1); + let id3 = result3[0].id; + + // All three updates should return the same track ID + assert_eq!(id1, id2, "Track ID should be stable across updates"); + assert_eq!(id2, id3, "Track ID should be stable across updates"); + } +} diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml index 11114e9bb..b3c16e0dd 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml @@ -11,6 +11,12 @@ keywords = ["wifi", "csi", "signal-processing", "densepose", "rust"] categories = ["science", "computer-vision"] readme = "README.md" +[features] +default = ["eigenvalue"] +## Enable eigenvalue-based person counting (requires BLAS via ndarray-linalg). +## Disable with --no-default-features to use the diagonal fallback instead. +eigenvalue = ["ndarray-linalg"] + [dependencies] # Core utilities thiserror.workspace = true @@ -20,6 +26,7 @@ chrono = { version = "0.4", features = ["serde"] } # Signal processing ndarray = { workspace = true } +ndarray-linalg = { workspace = true, optional = true } rustfft.workspace = true num-complex.workspace = true num-traits.workspace = true diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs index 7494235e0..028c772db 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs @@ -17,6 +17,12 @@ //! of Squares and Products." Technometrics. //! - ADR-030: RuvSense Persistent Field Model +use ndarray::Array2; +#[cfg(feature = "eigenvalue")] +use ndarray_linalg::Eigh; +#[cfg(feature = "eigenvalue")] +use ndarray_linalg::UPLO; + // --------------------------------------------------------------------------- // Error types // --------------------------------------------------------------------------- @@ -47,6 +53,14 @@ pub enum FieldModelError { /// Invalid configuration parameter. #[error("Invalid configuration: {0}")] InvalidConfig(String), + + /// Model has not been calibrated yet. + #[error("Field model not calibrated")] + NotCalibrated, + + /// Not enough data for the requested operation. + #[error("Insufficient data: need {need}, have {have}")] + InsufficientData { need: usize, have: usize }, } // --------------------------------------------------------------------------- @@ -260,6 +274,8 @@ pub struct FieldNormalMode { pub calibrated_at_us: u64, /// Hash of mesh geometry at calibration time. pub geometry_hash: u64, + /// Baseline eigenvalue count above Marcenko-Pastur threshold (empty-room). + pub baseline_eigenvalue_count: usize, } /// Body perturbation extracted from a CSI observation. @@ -310,6 +326,60 @@ pub struct FieldModel { status: CalibrationStatus, /// Timestamp of last calibration completion (microseconds). last_calibration_us: u64, + /// Running outer-product sum for full covariance SVD: [n_sub x n_sub]. + covariance_sum: Option>, + /// Number of frames accumulated into covariance_sum. + covariance_count: u64, +} + +/// Diagonal variance fallback for when full covariance SVD is unavailable. +/// +/// Returns `(mode_energies, environmental_modes, baseline_eigenvalue_count)`. +fn diagonal_fallback( + link_stats: &[LinkBaselineStats], + n_sc: usize, + n_modes: usize, +) -> (Vec, Vec>, usize) { + // Average variance across links (diagonal approximation) + let mut avg_variance = vec![0.0_f64; n_sc]; + for ls in link_stats { + let var = ls.variance_vector(); + for (i, v) in var.iter().enumerate() { + avg_variance[i] += v; + } + } + let n_links_f = link_stats.len() as f64; + if n_links_f > 0.0 { + for v in avg_variance.iter_mut() { + *v /= n_links_f; + } + } + + // Sort subcarrier indices by variance (descending) to pick top-K modes + let mut indices: Vec = (0..n_sc).collect(); + indices.sort_by(|&a, &b| { + avg_variance[b] + .partial_cmp(&avg_variance[a]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + let mut environmental_modes = Vec::with_capacity(n_modes); + let mut mode_energies = Vec::with_capacity(n_modes); + + for k in 0..n_modes.min(n_sc) { + let idx = indices[k]; + let mut mode = vec![0.0_f64; n_sc]; + mode[idx] = 1.0; + mode_energies.push(avg_variance[idx]); + environmental_modes.push(mode); + } + + // For diagonal fallback, estimate baseline eigenvalue count from variance + let total_var: f64 = avg_variance.iter().sum(); + let mean_var = if n_sc > 0 { total_var / n_sc as f64 } else { 0.0 }; + let baseline_count = avg_variance.iter().filter(|&&v| v > mean_var * 2.0).count(); + + (mode_energies, environmental_modes, baseline_count) } impl FieldModel { @@ -339,6 +409,8 @@ impl FieldModel { modes: None, status: CalibrationStatus::Uncalibrated, last_calibration_us: 0, + covariance_sum: None, + covariance_count: 0, }) } @@ -375,6 +447,30 @@ impl FieldModel { if self.status == CalibrationStatus::Uncalibrated { self.status = CalibrationStatus::Collecting; } + + // Accumulate raw outer products for SVD covariance (no centering here — + // mean subtraction is deferred to finalize_calibration to avoid bias). + // We average across links so covariance_count tracks frames, not links. + let n = self.config.n_subcarriers; + let cov = self.covariance_sum.get_or_insert_with(|| Array2::zeros((n, n))); + let n_links = observations.len(); + for obs in observations { + if obs.len() >= n { + // Rank-1 update: cov += obs * obs^T (raw, un-centered) + for i in 0..n { + for j in i..n { + let val = obs[i] * obs[j]; + cov[[i, j]] += val; + if i != j { + cov[[j, i]] += val; + } + } + } + } + } + // Count once per frame (not per link) for correct MP ratio + self.covariance_count += 1; + Ok(()) } @@ -396,58 +492,134 @@ impl FieldModel { }); } - // Build covariance matrix from per-link variance data. - // We average the variance vectors across all links to get the - // covariance diagonal, then compute eigenmodes via power iteration. let n_sc = self.config.n_subcarriers; let n_modes = self.config.n_modes.min(n_sc); // Collect per-link baselines let baseline: Vec> = self.link_stats.iter().map(|ls| ls.mean_vector()).collect(); - // Average covariance across links (diagonal approximation) - let mut avg_variance = vec![0.0_f64; n_sc]; - for ls in &self.link_stats { - let var = ls.variance_vector(); - for (i, v) in var.iter().enumerate() { - avg_variance[i] += v; + // --- True eigenvalue decomposition (with diagonal fallback) --- + let (mode_energies, environmental_modes, baseline_eig_count) = + if let Some(ref cov_sum) = self.covariance_sum { + if self.covariance_count > 1 { + // Compute sample covariance from raw outer products: + // cov = (sum_xx / N - mean * mean^T) * N / (N-1) + // where sum_xx accumulated obs * obs^T across all links per frame. + // We average per-link means for centering. + let n_frames = self.covariance_count as f64; + let n_links = self.config.n_links as f64; + // Average mean across all links + let mut avg_mean = vec![0.0f64; n_sc]; + for ls in &self.link_stats { + let m = ls.mean_vector(); + for i in 0..n_sc { avg_mean[i] += m[i]; } + } + for i in 0..n_sc { avg_mean[i] /= n_links; } + // cov = sum_xx / (N * n_links) - mean * mean^T, then Bessel correction + let total_obs = n_frames * n_links; + let mut covariance = cov_sum / total_obs; + for i in 0..n_sc { + for j in 0..n_sc { + covariance[[i, j]] -= avg_mean[i] * avg_mean[j]; + } + } + // Bessel's correction: multiply by N/(N-1) where N = total observations + let bessel = total_obs / (total_obs - 1.0); + covariance *= bessel; + + // Symmetric eigendecomposition (requires eigenvalue feature / BLAS) + #[cfg(feature = "eigenvalue")] + match covariance.eigh(UPLO::Upper) { + Ok((eigenvalues, eigenvectors)) => { + // eigenvalues are in ascending order from ndarray-linalg + // Reverse to get descending + let len = eigenvalues.len(); + let mut sorted_indices: Vec = (0..len).collect(); + sorted_indices.sort_by(|&a, &b| { + eigenvalues[b] + .partial_cmp(&eigenvalues[a]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + // Extract top n_modes + let modes: Vec> = sorted_indices + .iter() + .take(n_modes) + .map(|&idx| eigenvectors.column(idx).to_vec()) + .collect(); + let energies: Vec = sorted_indices + .iter() + .take(n_modes) + .map(|&idx| eigenvalues[idx].max(0.0)) + .collect(); + + // Marcenko-Pastur noise estimate: median of POSITIVE + // eigenvalues in the bottom half. Excludes zeros from + // rank-deficient matrices (when p > n). + let noise_var = { + let mut positive: Vec = eigenvalues + .iter().copied().filter(|&e| e > 1e-10).collect(); + positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + if positive.len() >= 4 { + let half = positive.len() / 2; + positive[..half].iter().sum::() / half as f64 + } else if !positive.is_empty() { + positive[0] + } else { + 1e-10 + } + }; + // MP ratio: p/n where n = total observations (frames * links) + let total_obs_mp = self.covariance_count as f64 * self.config.n_links as f64; + let ratio = n_sc as f64 / total_obs_mp; + let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2); + let baseline_count = eigenvalues + .iter() + .filter(|&&ev| ev > mp_threshold) + .count(); + + (energies, modes, baseline_count) + } + Err(_) => { + // Fallback to diagonal approximation on SVD failure + diagonal_fallback(&self.link_stats, n_sc, n_modes) + } + } + // When eigenvalue feature is disabled, use diagonal fallback + #[cfg(not(feature = "eigenvalue"))] + { diagonal_fallback(&self.link_stats, n_sc, n_modes) } + } else { + diagonal_fallback(&self.link_stats, n_sc, n_modes) + } + } else { + diagonal_fallback(&self.link_stats, n_sc, n_modes) + }; + + // Compute variance explained using the same centered covariance as modes. + // total_variance = trace(centered_covariance) = sum of ALL eigenvalues. + let total_energy: f64 = mode_energies.iter().sum(); + let total_variance = if let Some(ref cov_sum) = self.covariance_sum { + if self.covariance_count > 1 { + let n_links_f = self.config.n_links as f64; + let total_obs = self.covariance_count as f64 * n_links_f; + // Centered trace: E[x^2] - E[x]^2, with Bessel correction + let mut avg_mean = vec![0.0f64; n_sc]; + for ls in &self.link_stats { + let m = ls.mean_vector(); + for i in 0..n_sc { avg_mean[i] += m[i]; } + } + for i in 0..n_sc { avg_mean[i] /= n_links_f; } + let raw_trace: f64 = (0..n_sc).map(|i| cov_sum[[i, i]] / total_obs).sum(); + let mean_sq: f64 = avg_mean.iter().map(|m| m * m).sum(); + (raw_trace - mean_sq).max(0.0) * total_obs / (total_obs - 1.0) + } else { + total_energy } - } - let n_links_f = self.config.n_links as f64; - for v in avg_variance.iter_mut() { - *v /= n_links_f; - } - - // Extract modes via simplified power iteration on the diagonal - // covariance. Since we use a diagonal approximation, the eigenmodes - // are aligned with the standard basis, sorted by variance. - let total_variance: f64 = avg_variance.iter().sum(); - - // Sort subcarrier indices by variance (descending) to pick top-K modes - let mut indices: Vec = (0..n_sc).collect(); - indices.sort_by(|&a, &b| { - avg_variance[b] - .partial_cmp(&avg_variance[a]) - .unwrap_or(std::cmp::Ordering::Equal) - }); - - let mut environmental_modes = Vec::with_capacity(n_modes); - let mut mode_energies = Vec::with_capacity(n_modes); - let mut explained = 0.0_f64; - - for k in 0..n_modes { - let idx = indices[k]; - // Create a unit vector along the highest-variance subcarrier - let mut mode = vec![0.0_f64; n_sc]; - mode[idx] = 1.0; - let energy = avg_variance[idx]; - environmental_modes.push(mode); - mode_energies.push(energy); - explained += energy; - } - + } else { + total_energy + }; let variance_explained = if total_variance > 1e-15 { - explained / total_variance + total_energy / total_variance } else { 0.0 }; @@ -459,6 +631,7 @@ impl FieldModel { variance_explained, calibrated_at_us: timestamp_us, geometry_hash, + baseline_eigenvalue_count: baseline_eig_count, }; self.modes = Some(field_mode); @@ -541,6 +714,100 @@ impl FieldModel { }) } + /// Estimate room occupancy from eigenvalue analysis of recent CSI frames. + /// + /// `recent_frames`: sliding window of amplitude vectors (recommend 50 frames + /// ~ 2.5s at 20 Hz). Returns estimated person count (0 = empty room). + /// + /// Requires the `eigenvalue` feature (BLAS). Returns `NotCalibrated` when + /// the feature is disabled. + #[cfg(feature = "eigenvalue")] + pub fn estimate_occupancy(&self, recent_frames: &[Vec]) -> Result { + let modes = self.modes.as_ref().ok_or(FieldModelError::NotCalibrated)?; + + let n = self.config.n_subcarriers; + if recent_frames.len() < 10 { + return Err(FieldModelError::InsufficientData { + need: 10, + have: recent_frames.len(), + }); + } + + // Build covariance matrix from recent frames + let mut mean = vec![0.0f64; n]; + let mut count = 0usize; + for frame in recent_frames { + if frame.len() >= n { + for i in 0..n { + mean[i] += frame[i]; + } + count += 1; + } + } + if count < 2 { + return Ok(0); + } + for m in &mut mean { + *m /= count as f64; + } + + let mut cov = Array2::::zeros((n, n)); + for frame in recent_frames { + if frame.len() >= n { + for i in 0..n { + let ci = frame[i] - mean[i]; + for j in i..n { + let val = ci * (frame[j] - mean[j]); + cov[[i, j]] += val; + if i != j { + cov[[j, i]] += val; + } + } + } + } + } + let scale = 1.0 / (count as f64 - 1.0); + cov *= scale; + + // Eigendecompose + let eigenvalues = match cov.eigh(UPLO::Upper) { + Ok((evals, _)) => evals, + Err(_) => return Ok(0), // SVD failure = can't estimate + }; + + // Marcenko-Pastur noise estimate: median of POSITIVE eigenvalues + // in the bottom half. Excludes zeros from rank-deficient matrices + // (common when n_subcarriers > n_frames, e.g. 56 subcarriers / 50 frames). + let noise_var = { + let mut positive: Vec = eigenvalues.iter() + .copied() + .filter(|&e| e > 1e-10) + .collect(); + positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + if positive.len() >= 4 { + let half = positive.len() / 2; + positive[..half].iter().sum::() / half as f64 + } else if !positive.is_empty() { + positive[0] + } else { + return Ok(0); // All zero eigenvalues — can't estimate + } + }; + let ratio = n as f64 / count as f64; + let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2); + + let significant = eigenvalues.iter().filter(|&&ev| ev > mp_threshold).count(); + let occupancy = significant.saturating_sub(modes.baseline_eigenvalue_count); + + Ok(occupancy.min(10)) // Cap at 10 persons + } + + /// Stub when eigenvalue feature is disabled — always returns NotCalibrated. + #[cfg(not(feature = "eigenvalue"))] + pub fn estimate_occupancy(&self, _recent_frames: &[Vec]) -> Result { + Err(FieldModelError::NotCalibrated) + } + /// Check calibration freshness against a given timestamp. pub fn check_freshness(&self, current_us: u64) -> CalibrationStatus { if self.modes.is_none() { @@ -563,6 +830,8 @@ impl FieldModel { .collect(); self.modes = None; self.status = CalibrationStatus::Uncalibrated; + self.covariance_sum = None; + self.covariance_count = 0; } } @@ -873,6 +1142,179 @@ mod tests { } } + #[test] + fn test_covariance_accumulation() { + let config = make_config(2, 4, 5); + let mut model = FieldModel::new(config).unwrap(); + + // Feed calibration data + for i in 0..10 { + let obs = make_observations(2, 4, 1.0 + 0.1 * i as f64); + model.feed_calibration(&obs).unwrap(); + } + + // covariance_sum should be populated + assert!(model.covariance_sum.is_some()); + assert!(model.covariance_count > 0); + let cov = model.covariance_sum.as_ref().unwrap(); + assert_eq!(cov.shape(), &[4, 4]); + // Diagonal entries should be non-negative (sum of squares) + for i in 0..4 { + assert!(cov[[i, i]] >= 0.0, "Diagonal covariance entry must be >= 0"); + } + // Matrix should be symmetric + for i in 0..4 { + for j in 0..4 { + assert!( + (cov[[i, j]] - cov[[j, i]]).abs() < 1e-10, + "Covariance matrix must be symmetric" + ); + } + } + } + + #[test] + fn test_svd_finalize_produces_orthonormal_modes() { + let config = FieldModelConfig { + n_links: 1, + n_subcarriers: 8, + n_modes: 3, + min_calibration_frames: 20, + baseline_expiry_s: 86_400.0, + }; + let mut model = FieldModel::new(config).unwrap(); + + // Feed frames with correlated subcarrier patterns to produce + // non-trivial eigenmodes + for i in 0..50 { + let t = i as f64 * 0.1; + let obs = vec![vec![ + 1.0 + t.sin(), + 2.0 + t.cos(), + 3.0 + 0.5 * t.sin(), + 4.0 + 0.3 * t.cos(), + 5.0 + 0.1 * t, + 6.0, + 7.0 + 0.2 * (2.0 * t).sin(), + 8.0 + 0.1 * (2.0 * t).cos(), + ]]; + model.feed_calibration(&obs).unwrap(); + } + model.finalize_calibration(1_000_000, 0).unwrap(); + + let modes = model.modes().unwrap(); + // Each mode should be approximately unit length + for (k, mode) in modes.environmental_modes.iter().enumerate() { + let norm: f64 = mode.iter().map(|x| x * x).sum::().sqrt(); + assert!( + (norm - 1.0).abs() < 0.01, + "Mode {} has norm {} (expected ~1.0)", + k, + norm + ); + } + // Modes should be approximately orthogonal + for i in 0..modes.environmental_modes.len() { + for j in (i + 1)..modes.environmental_modes.len() { + let dot: f64 = modes.environmental_modes[i] + .iter() + .zip(modes.environmental_modes[j].iter()) + .map(|(a, b)| a * b) + .sum(); + assert!( + dot.abs() < 0.05, + "Modes {} and {} have dot product {} (expected ~0)", + i, + j, + dot + ); + } + } + } + + #[test] + fn test_estimate_occupancy_noise_only() { + let config = FieldModelConfig { + n_links: 1, + n_subcarriers: 8, + n_modes: 3, + min_calibration_frames: 20, + baseline_expiry_s: 86_400.0, + }; + let mut model = FieldModel::new(config).unwrap(); + + // Calibrate with some deterministic noise-like pattern + for i in 0..50 { + let t = i as f64 * 0.1; + let obs = vec![vec![ + 1.0 + 0.01 * t.sin(), + 2.0 + 0.01 * t.cos(), + 3.0 + 0.01 * (2.0 * t).sin(), + 4.0 + 0.01 * (2.0 * t).cos(), + 5.0 + 0.01 * (3.0 * t).sin(), + 6.0 + 0.01 * (3.0 * t).cos(), + 7.0 + 0.01 * (4.0 * t).sin(), + 8.0 + 0.01 * (4.0 * t).cos(), + ]]; + model.feed_calibration(&obs).unwrap(); + } + model.finalize_calibration(1_000_000, 0).unwrap(); + + // Estimate occupancy with similar noise-only frames + let frames: Vec> = (0..20) + .map(|i| { + let t = (i + 50) as f64 * 0.1; + vec![ + 1.0 + 0.01 * t.sin(), + 2.0 + 0.01 * t.cos(), + 3.0 + 0.01 * (2.0 * t).sin(), + 4.0 + 0.01 * (2.0 * t).cos(), + 5.0 + 0.01 * (3.0 * t).sin(), + 6.0 + 0.01 * (3.0 * t).cos(), + 7.0 + 0.01 * (4.0 * t).sin(), + 8.0 + 0.01 * (4.0 * t).cos(), + ] + }) + .collect(); + let occupancy = model.estimate_occupancy(&frames).unwrap(); + assert_eq!(occupancy, 0, "Noise-only frames should yield 0 occupancy"); + } + + #[test] + fn test_baseline_eigenvalue_count_stored() { + let config = FieldModelConfig { + n_links: 1, + n_subcarriers: 8, + n_modes: 3, + min_calibration_frames: 20, + baseline_expiry_s: 86_400.0, + }; + let mut model = FieldModel::new(config).unwrap(); + + // Feed frames with structured variance so eigenvalues are meaningful + for i in 0..50 { + let t = i as f64 * 0.1; + let obs = vec![vec![ + 1.0 + t.sin(), + 2.0 + t.cos(), + 3.0 + 0.5 * t.sin(), + 4.0 + 0.3 * t.cos(), + 5.0 + 0.1 * t, + 6.0, + 7.0, + 8.0, + ]]; + model.feed_calibration(&obs).unwrap(); + } + let modes = model.finalize_calibration(1_000_000, 0).unwrap(); + // baseline_eigenvalue_count should exist and be a reasonable value + // (at least 0, at most n_subcarriers) + assert!( + modes.baseline_eigenvalue_count <= 8, + "baseline_eigenvalue_count should be <= n_subcarriers" + ); + } + #[test] fn test_environmental_projection_removes_drift() { let config = make_config(1, 4, 10); diff --git a/ui/components/SensingTab.js b/ui/components/SensingTab.js index 6c3115c12..33387eefe 100644 --- a/ui/components/SensingTab.js +++ b/ui/components/SensingTab.js @@ -110,12 +110,18 @@ export class SensingTab {
About This Data

Metrics are computed from WiFi Channel State Information (CSI). - With 1 ESP32 you get presence detection, breathing + With 0 ESP32 node(s) you get presence detection, breathing estimation, and gross motion. Add 3-4+ ESP32 nodes around the room for spatial resolution and limb-level tracking.

+ +
+
NODE STATUS
+
+
+
Details
@@ -193,6 +199,9 @@ export class SensingTab { // Update HUD this._updateHUD(data); + + // Update per-node panels + this._updateNodePanels(data); } _onStateChange(state) { @@ -233,6 +242,11 @@ export class SensingTab { const f = data.features || {}; const c = data.classification || {}; + // Node count + const nodeCount = (data.nodes || []).length; + const countEl = this.container.querySelector('#sensingNodeCount'); + if (countEl) countEl.textContent = String(nodeCount); + // RSSI this._setText('sensingRssi', `${(f.mean_rssi || -80).toFixed(1)} dBm`); this._setText('sensingSource', data.source || ''); @@ -309,6 +323,57 @@ export class SensingTab { ctx.stroke(); } + // ---- Per-node panels --------------------------------------------------- + + _updateNodePanels(data) { + const container = this.container.querySelector('#nodeStatusContainer'); + if (!container) return; + const nodeFeatures = data.node_features || []; + if (nodeFeatures.length === 0) { + container.textContent = ''; + const msg = document.createElement('div'); + msg.style.cssText = 'color:#888;font-size:12px;padding:8px;'; + msg.textContent = 'No nodes detected'; + container.appendChild(msg); + return; + } + const NODE_COLORS = ['#00ccff', '#ff6600', '#00ff88', '#ff00cc', '#ffcc00', '#8800ff', '#00ffcc', '#ff0044']; + container.textContent = ''; + for (const nf of nodeFeatures) { + const color = NODE_COLORS[nf.node_id % NODE_COLORS.length]; + const statusColor = nf.stale ? '#888' : '#0f0'; + + const row = document.createElement('div'); + row.style.cssText = `display:flex;align-items:center;gap:8px;padding:6px 8px;margin-bottom:4px;background:rgba(255,255,255,0.03);border-radius:6px;border-left:3px solid ${color};`; + + const idCol = document.createElement('div'); + idCol.style.minWidth = '50px'; + const nameEl = document.createElement('div'); + nameEl.style.cssText = `font-size:11px;font-weight:600;color:${color};`; + nameEl.textContent = 'Node ' + nf.node_id; + const statusEl = document.createElement('div'); + statusEl.style.cssText = `font-size:9px;color:${statusColor};`; + statusEl.textContent = nf.stale ? 'STALE' : 'ACTIVE'; + idCol.appendChild(nameEl); + idCol.appendChild(statusEl); + + const metricsCol = document.createElement('div'); + metricsCol.style.cssText = 'flex:1;font-size:10px;color:#aaa;'; + metricsCol.textContent = (nf.rssi_dbm || -80).toFixed(0) + ' dBm · var ' + (nf.features?.variance || 0).toFixed(1); + + const classCol = document.createElement('div'); + classCol.style.cssText = 'font-size:10px;font-weight:600;color:#ccc;'; + const motion = (nf.classification?.motion_level || 'absent').toUpperCase(); + const conf = ((nf.classification?.confidence || 0) * 100).toFixed(0); + classCol.textContent = motion + ' ' + conf + '%'; + + row.appendChild(idCol); + row.appendChild(metricsCol); + row.appendChild(classCol); + container.appendChild(row); + } + } + // ---- Resize ------------------------------------------------------------ _setupResize() { diff --git a/ui/components/gaussian-splats.js b/ui/components/gaussian-splats.js index ecab6e481..5f7227fa3 100644 --- a/ui/components/gaussian-splats.js +++ b/ui/components/gaussian-splats.js @@ -66,6 +66,10 @@ function valueToColor(v) { return [r, g, b]; } +// ---- Node marker color palette ------------------------------------------- + +const NODE_MARKER_COLORS = [0x00ccff, 0xff6600, 0x00ff88, 0xff00cc, 0xffcc00, 0x8800ff, 0x00ffcc, 0xff0044]; + // ---- GaussianSplatRenderer ----------------------------------------------- export class GaussianSplatRenderer { @@ -108,6 +112,10 @@ export class GaussianSplatRenderer { // Node markers (ESP32 / router positions) this._createNodeMarkers(THREE); + // Dynamic per-node markers (multi-node support) + this.nodeMarkers = new Map(); // nodeId -> THREE.Mesh + this._THREE = THREE; + // Body disruption blob this._createBodyBlob(THREE); @@ -369,11 +377,43 @@ export class GaussianSplatRenderer { bGeo.attributes.splatSize.needsUpdate = true; } - // -- Update node positions --------------------------------------------- + // -- Update node positions (legacy single-node) ------------------------ if (nodes.length > 0 && nodes[0].position) { const pos = nodes[0].position; this.nodeMarker.position.set(pos[0], 0.5, pos[2]); } + + // -- Update dynamic per-node markers (multi-node support) -------------- + if (nodes && nodes.length > 0 && this.scene) { + const THREE = this._THREE || window.THREE; + if (THREE) { + const activeIds = new Set(); + for (const node of nodes) { + activeIds.add(node.node_id); + if (!this.nodeMarkers.has(node.node_id)) { + const geo = new THREE.SphereGeometry(0.25, 16, 16); + const mat = new THREE.MeshBasicMaterial({ + color: NODE_MARKER_COLORS[node.node_id % NODE_MARKER_COLORS.length], + transparent: true, + opacity: 0.8, + }); + const marker = new THREE.Mesh(geo, mat); + this.scene.add(marker); + this.nodeMarkers.set(node.node_id, marker); + } + const marker = this.nodeMarkers.get(node.node_id); + const pos = node.position || [0, 0, 0]; + marker.position.set(pos[0], 0.5, pos[2]); + } + // Remove stale markers + for (const [id, marker] of this.nodeMarkers) { + if (!activeIds.has(id)) { + this.scene.remove(marker); + this.nodeMarkers.delete(id); + } + } + } + } } // ---- Render loop ------------------------------------------------------- diff --git a/ui/services/sensing.service.js b/ui/services/sensing.service.js index 4931e86e2..0992483bc 100644 --- a/ui/services/sensing.service.js +++ b/ui/services/sensing.service.js @@ -84,6 +84,11 @@ class SensingService { return [...this._rssiHistory]; } + /** Get per-node RSSI history (object keyed by node_id). */ + getPerNodeRssiHistory() { + return { ...(this._perNodeRssiHistory || {}) }; + } + /** Current connection state. */ get state() { return this._state; @@ -327,6 +332,20 @@ class SensingService { } } + // Per-node RSSI tracking + if (!this._perNodeRssiHistory) this._perNodeRssiHistory = {}; + if (data.node_features) { + for (const nf of data.node_features) { + if (!this._perNodeRssiHistory[nf.node_id]) { + this._perNodeRssiHistory[nf.node_id] = []; + } + this._perNodeRssiHistory[nf.node_id].push(nf.rssi_dbm); + if (this._perNodeRssiHistory[nf.node_id].length > this._maxHistory) { + this._perNodeRssiHistory[nf.node_id].shift(); + } + } + } + // Notify all listeners for (const cb of this._listeners) { try { diff --git a/wifi_densepose/__init__.py b/wifi_densepose/__init__.py new file mode 100644 index 000000000..83d6e204f --- /dev/null +++ b/wifi_densepose/__init__.py @@ -0,0 +1,137 @@ +""" +WiFi-DensePose — WiFi-based human pose estimation using CSI data. + +Usage: + from wifi_densepose import WiFiDensePose + + system = WiFiDensePose() + system.start() + poses = system.get_latest_poses() + system.stop() +""" + +__version__ = "1.2.0" + +import sys +import os +import logging + +logger = logging.getLogger(__name__) + +# Allow importing the v1 src package when installed from the repo +_v1_src = os.path.join(os.path.dirname(os.path.dirname(__file__)), "v1") +if os.path.isdir(_v1_src) and _v1_src not in sys.path: + sys.path.insert(0, _v1_src) + + +class WiFiDensePose: + """High-level facade for the WiFi-DensePose sensing system. + + This is the primary entry point documented in the README Quick Start. + It wraps the underlying ServiceOrchestrator and exposes a simple + start / get_latest_poses / stop interface. + """ + + def __init__(self, host: str = "0.0.0.0", port: int = 3000, **kwargs): + self.host = host + self.port = port + self._config = kwargs + self._orchestrator = None + self._server_task = None + self._poses = [] + self._running = False + + # ------------------------------------------------------------------ + # Public API (matches README Quick Start) + # ------------------------------------------------------------------ + + def start(self): + """Start the sensing system (blocking until ready).""" + import asyncio + + loop = _get_or_create_event_loop() + loop.run_until_complete(self._async_start()) + + async def _async_start(self): + try: + from src.config.settings import get_settings + from src.services.orchestrator import ServiceOrchestrator + + settings = get_settings() + self._orchestrator = ServiceOrchestrator(settings) + await self._orchestrator.initialize() + await self._orchestrator.start() + self._running = True + logger.info("WiFiDensePose system started on %s:%s", self.host, self.port) + except ImportError: + raise ImportError( + "Core dependencies not found. Make sure you installed " + "from the repository root:\n" + " cd wifi-densepose && pip install -e .\n" + "Or install the v1 package:\n" + " cd wifi-densepose/v1 && pip install -e ." + ) + + def stop(self): + """Stop the sensing system.""" + import asyncio + + if self._orchestrator is not None: + loop = _get_or_create_event_loop() + loop.run_until_complete(self._orchestrator.shutdown()) + self._running = False + logger.info("WiFiDensePose system stopped") + + def get_latest_poses(self): + """Return the most recent list of detected pose dicts.""" + if self._orchestrator is None: + return [] + try: + import asyncio + + loop = _get_or_create_event_loop() + return loop.run_until_complete(self._fetch_poses()) + except Exception: + return [] + + async def _fetch_poses(self): + try: + pose_svc = self._orchestrator.pose_service + if pose_svc and hasattr(pose_svc, "get_latest"): + return await pose_svc.get_latest() + except Exception: + pass + return [] + + # ------------------------------------------------------------------ + # Context-manager support + # ------------------------------------------------------------------ + + def __enter__(self): + self.start() + return self + + def __exit__(self, *exc): + self.stop() + + # ------------------------------------------------------------------ + # Convenience re-exports + # ------------------------------------------------------------------ + + @staticmethod + def version(): + return __version__ + + +def _get_or_create_event_loop(): + import asyncio + + try: + return asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + + +__all__ = ["WiFiDensePose", "__version__"]