diff --git a/firmware/esp32-csi-node/main/edge_processing.c b/firmware/esp32-csi-node/main/edge_processing.c index 1cd74a65d..0911f3831 100644 --- a/firmware/esp32-csi-node/main/edge_processing.c +++ b/firmware/esp32-csi-node/main/edge_processing.c @@ -41,14 +41,12 @@ static const char *TAG = "edge_proc"; * ====================================================================== */ static edge_ring_buf_t s_ring; -static uint32_t s_ring_drops; /* Frames dropped due to full ring buffer. */ static inline bool ring_push(const uint8_t *iq, uint16_t len, int8_t rssi, uint8_t channel) { uint32_t next = (s_ring.head + 1) % EDGE_RING_SLOTS; if (next == s_ring.tail) { - s_ring_drops++; return false; /* Full — drop frame. */ } @@ -790,13 +788,12 @@ static void process_frame(const edge_ring_slot_t *slot) if ((s_frame_count % 200) == 0) { ESP_LOGI(TAG, "Vitals: br=%.1f hr=%.1f motion=%.4f pres=%s " - "fall=%s persons=%u frames=%lu drops=%lu", + "fall=%s persons=%u frames=%lu", s_breathing_bpm, s_heartrate_bpm, s_motion_energy, s_presence_detected ? "YES" : "no", s_fall_detected ? "YES" : "no", (unsigned)s_latest_pkt.n_persons, - (unsigned long)s_frame_count, - (unsigned long)s_ring_drops); + (unsigned long)s_frame_count); } } @@ -834,32 +831,18 @@ static void edge_task(void *arg) edge_ring_slot_t slot; - /* Maximum frames to process before a longer yield. On busy LANs - * (corporate networks, many APs), the ring buffer fills continuously. - * Without a batch limit the task processes frames back-to-back with - * only 1-tick yields, which on high frame rates can still starve - * IDLE1 enough to trip the 5-second task watchdog. See #266, #321. */ - const uint8_t BATCH_LIMIT = 4; - while (1) { - uint8_t processed = 0; - - while (processed < BATCH_LIMIT && ring_pop(&slot)) { + if (ring_pop(&slot)) { process_frame(&slot); - processed++; - /* 1-tick yield between frames within a batch. */ + /* Yield after every frame to feed the Core 1 watchdog. + * process_frame() is CPU-intensive (biquad filters, Welford stats, + * BPM estimation, multi-person vitals) and can take several ms. + * Without this yield, edge_dsp at priority 5 starves IDLE1 at + * priority 0, triggering the task watchdog. See issue #266. */ vTaskDelay(1); - } - - if (processed > 0) { - /* Post-batch yield: 2 ticks (~20 ms at 100 Hz) so IDLE1 can - * run and feed the Core 1 watchdog even under sustained load. - * This is intentionally longer than the 1-tick inter-frame yield. */ - vTaskDelay(2); } else { - /* No frames available — sleep one full tick. - * NOTE: pdMS_TO_TICKS(5) == 0 at 100 Hz, which would busy-spin. */ - vTaskDelay(1); + /* No frames available — yield briefly. */ + vTaskDelay(pdMS_TO_TICKS(1)); } } } diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml index ee3ce0bef..a76e6f1c1 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml @@ -43,5 +43,8 @@ clap = { workspace = true } # Multi-BSSID WiFi scanning pipeline (ADR-022 Phase 3) wifi-densepose-wifiscan = { version = "0.3.0", path = "../wifi-densepose-wifiscan" } +# Signal processing with RuvSense pose tracker (accuracy sprint) +wifi-densepose-signal = { version = "0.3.0", path = "../wifi-densepose-signal" } + [dev-dependencies] tempfile = "3.10" diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs index 80d2364d5..b89cb58cf 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs @@ -10,6 +10,10 @@ //! //! The trained model is serialised as JSON and hot-loaded at runtime so that //! the classification thresholds adapt to the specific room and ESP32 placement. +//! +//! Classes are discovered dynamically from training data filenames instead of +//! being hardcoded, so new activity classes can be added just by recording data +//! with the appropriate filename convention. use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -20,9 +24,8 @@ use std::path::{Path, PathBuf}; /// Extended feature vector: 7 server features + 8 subcarrier-derived features = 15. const N_FEATURES: usize = 15; -/// Activity classes we recognise. -pub const CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"]; -const N_CLASSES: usize = 4; +/// Default class names for backward compatibility with old saved models. +const DEFAULT_CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"]; /// Extract extended feature vector from a JSONL frame (features + raw amplitudes). pub fn features_from_frame(frame: &serde_json::Value) -> [f64; N_FEATURES] { @@ -124,8 +127,9 @@ pub struct ClassStats { pub struct AdaptiveModel { /// Per-class feature statistics (centroid + spread). pub class_stats: Vec, - /// Logistic regression weights: [N_CLASSES x (N_FEATURES + 1)] (last = bias). - pub weights: Vec<[f64; N_FEATURES + 1]>, + /// Logistic regression weights: [n_classes x (N_FEATURES + 1)] (last = bias). + /// Dynamic: the outer Vec length equals the number of discovered classes. + pub weights: Vec>, /// Global feature normalisation: mean and stddev across all training data. pub global_mean: [f64; N_FEATURES], pub global_std: [f64; N_FEATURES], @@ -133,27 +137,38 @@ pub struct AdaptiveModel { pub trained_frames: usize, pub training_accuracy: f64, pub version: u32, + /// Dynamically discovered class names (in index order). + #[serde(default = "default_class_names")] + pub class_names: Vec, +} + +/// Backward-compatible fallback for models saved without class_names. +fn default_class_names() -> Vec { + DEFAULT_CLASSES.iter().map(|s| s.to_string()).collect() } impl Default for AdaptiveModel { fn default() -> Self { + let n_classes = DEFAULT_CLASSES.len(); Self { class_stats: Vec::new(), - weights: vec![[0.0; N_FEATURES + 1]; N_CLASSES], + weights: vec![vec![0.0; N_FEATURES + 1]; n_classes], global_mean: [0.0; N_FEATURES], global_std: [1.0; N_FEATURES], trained_frames: 0, training_accuracy: 0.0, version: 1, + class_names: default_class_names(), } } } impl AdaptiveModel { /// Classify a raw feature vector. Returns (class_label, confidence). - pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (&'static str, f64) { - if self.weights.is_empty() || self.class_stats.is_empty() { - return ("present_still", 0.5); + pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (String, f64) { + let n_classes = self.weights.len(); + if n_classes == 0 || self.class_stats.is_empty() { + return ("present_still".to_string(), 0.5); } // Normalise features. @@ -163,8 +178,8 @@ impl AdaptiveModel { } // Compute logits: w·x + b for each class. - let mut logits = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES.min(self.weights.len()) { + let mut logits: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { let w = &self.weights[c]; let mut z = w[N_FEATURES]; // bias for i in 0..N_FEATURES { @@ -176,8 +191,8 @@ impl AdaptiveModel { // Softmax. let max_logit = logits.iter().cloned().fold(f64::NEG_INFINITY, f64::max); let exp_sum: f64 = logits.iter().map(|z| (z - max_logit).exp()).sum(); - let mut probs = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES { + let mut probs: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { probs[c] = ((logits[c] - max_logit).exp()) / exp_sum; } @@ -185,7 +200,11 @@ impl AdaptiveModel { let (best_c, best_p) = probs.iter().enumerate() .max_by(|a, b| a.1.partial_cmp(b.1).unwrap()) .unwrap(); - let label = if best_c < CLASSES.len() { CLASSES[best_c] } else { "present_still" }; + let label = if best_c < self.class_names.len() { + self.class_names[best_c].clone() + } else { + "present_still".to_string() + }; (label, *best_p) } @@ -228,48 +247,88 @@ fn load_recording(path: &Path, class_idx: usize) -> Vec { }).collect() } -/// Map a recording filename to a class index. -fn classify_recording_name(name: &str) -> Option { +/// Map a recording filename to a class name (String). +/// Returns the discovered class name for the file, or None if it cannot be determined. +fn classify_recording_name(name: &str) -> Option { let lower = name.to_lowercase(); - if lower.contains("empty") || lower.contains("absent") { Some(0) } - else if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { Some(1) } - else if lower.contains("walking") || lower.contains("moving") { Some(2) } - else if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { Some(3) } - else { None } + // Strip "train_" prefix and ".jsonl" suffix, then extract the class label. + // Convention: train__.jsonl + // The class is the first segment after "train_" that matches a known pattern, + // or the entire middle portion if no pattern matches. + + // Check common patterns first for backward compat + if lower.contains("empty") || lower.contains("absent") { return Some("absent".into()); } + if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { return Some("present_still".into()); } + if lower.contains("walking") || lower.contains("moving") { return Some("present_moving".into()); } + if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { return Some("active".into()); } + + // Fallback: extract class from filename structure train__*.jsonl + let stem = lower.trim_start_matches("train_").trim_end_matches(".jsonl"); + let class_name = stem.split('_').next().unwrap_or(stem); + if !class_name.is_empty() { + Some(class_name.to_string()) + } else { + None + } } /// Train a model from labeled JSONL recordings in a directory. /// -/// Recordings are matched to classes by filename pattern: -/// - `*empty*` / `*absent*` → absent (0) -/// - `*still*` / `*sitting*` → present_still (1) -/// - `*walking*` / `*moving*` → present_moving (2) -/// - `*active*` / `*exercise*`→ active (3) +/// Recordings are matched to classes by filename pattern. Classes are discovered +/// dynamically from the training data filenames: +/// - `*empty*` / `*absent*` → absent +/// - `*still*` / `*sitting*` → present_still +/// - `*walking*` / `*moving*` → present_moving +/// - `*active*` / `*exercise*`→ active +/// - Any other `train__*.jsonl` → pub fn train_from_recordings(recordings_dir: &Path) -> Result { - // Scan for train_* files. - let mut samples: Vec = Vec::new(); - let entries = std::fs::read_dir(recordings_dir) - .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?; - - for entry in entries.flatten() { + // First pass: scan filenames to discover all unique class names. + let entries: Vec<_> = std::fs::read_dir(recordings_dir) + .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))? + .flatten() + .collect(); + + let mut class_map: HashMap = HashMap::new(); + let mut class_names: Vec = Vec::new(); + + // Collect (entry, class_name) pairs for files that match. + let mut file_classes: Vec<(PathBuf, String, String)> = Vec::new(); // (path, fname, class_name) + for entry in &entries { let fname = entry.file_name().to_string_lossy().to_string(); if !fname.starts_with("train_") || !fname.ends_with(".jsonl") { continue; } - if let Some(class_idx) = classify_recording_name(&fname) { - let loaded = load_recording(&entry.path(), class_idx); - eprintln!(" Loaded {}: {} frames → class '{}'", - fname, loaded.len(), CLASSES[class_idx]); - samples.extend(loaded); + if let Some(class_name) = classify_recording_name(&fname) { + if !class_map.contains_key(&class_name) { + let idx = class_names.len(); + class_map.insert(class_name.clone(), idx); + class_names.push(class_name.clone()); + } + file_classes.push((entry.path(), fname, class_name)); } } + let n_classes = class_names.len(); + if n_classes == 0 { + return Err("No training samples found. Record data with train_* prefix.".into()); + } + + // Second pass: load recordings with the discovered class indices. + let mut samples: Vec = Vec::new(); + for (path, fname, class_name) in &file_classes { + let class_idx = class_map[class_name]; + let loaded = load_recording(path, class_idx); + eprintln!(" Loaded {}: {} frames → class '{}'", + fname, loaded.len(), class_name); + samples.extend(loaded); + } + if samples.is_empty() { return Err("No training samples found. Record data with train_* prefix.".into()); } let n = samples.len(); - eprintln!("Total training samples: {n}"); + eprintln!("Total training samples: {n} across {n_classes} classes: {:?}", class_names); // ── Compute global normalisation stats ── let mut global_mean = [0.0f64; N_FEATURES]; @@ -289,9 +348,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes]; let lr = 0.1; let epochs = 200; let batch_size = 32; @@ -348,19 +407,19 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes]; for (x, target) in batch { // Forward: softmax. - let mut logits = [0.0f64; N_CLASSES]; - for c in 0..N_CLASSES { + let mut logits: Vec = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; // bias for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -368,8 +427,8 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes]; + for c in 0..n_classes { probs[c] = ((logits[c] - max_l).exp()) / exp_sum; } @@ -377,7 +436,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -422,12 +481,12 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes]; + for c in 0..n_classes { logits[c] = weights[c][N_FEATURES]; for i in 0..N_FEATURES { logits[c] += weights[c][i] * x[i]; @@ -438,9 +497,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result FieldModelConfig { + FieldModelConfig { + n_links: 1, + ..FieldModelConfig::default() + } +} + +/// Estimate occupancy using the FieldModel when calibrated, falling back +/// to the score-based heuristic otherwise. +/// +/// Prefers `estimate_occupancy()` (eigenvalue-based) when the model is +/// calibrated and enough frames are available. Falls back to perturbation +/// energy thresholds, then to the score heuristic. +pub fn occupancy_or_fallback( + field: &FieldModel, + frame_history: &VecDeque>, + smoothed_score: f64, + prev_count: usize, +) -> usize { + match field.status() { + CalibrationStatus::Fresh | CalibrationStatus::Stale => { + let frames: Vec> = frame_history + .iter() + .rev() + .take(OCCUPANCY_WINDOW) + .cloned() + .collect(); + + if frames.is_empty() { + return score_to_person_count(smoothed_score, prev_count); + } + + // Try eigenvalue-based occupancy first (best accuracy). + match field.estimate_occupancy(&frames) { + Ok(count) => return count, + Err(_) => {} // fall through to perturbation energy + } + + // Fallback: perturbation energy thresholds. + // FieldModel expects [n_links][n_subcarriers] — we use n_links=1. + let observation = vec![frames[0].clone()]; + match field.extract_perturbation(&observation) { + Ok(perturbation) => { + if perturbation.total_energy > ENERGY_THRESH_3 { + 3 + } else if perturbation.total_energy > ENERGY_THRESH_2 { + 2 + } else if perturbation.total_energy > 1.0 { + 1 + } else { + 0 + } + } + Err(_) => score_to_person_count(smoothed_score, prev_count), + } + } + _ => score_to_person_count(smoothed_score, prev_count), + } +} + +/// Feed the latest frame to the FieldModel during calibration collection. +/// +/// Only acts when the model status is `Collecting`. Wraps the latest frame +/// as a single-link observation (n_links=1) and feeds it. +pub fn maybe_feed_calibration(field: &mut FieldModel, frame_history: &VecDeque>) { + if field.status() != CalibrationStatus::Collecting { + return; + } + if let Some(latest) = frame_history.back() { + // Single-link observation: [1][n_subcarriers] + let observations = vec![latest.clone()]; + if let Err(e) = field.feed_calibration(&observations) { + tracing::debug!("FieldModel calibration feed: {e}"); + } + } +} + +/// Parse node positions from a semicolon-delimited string. +/// +/// Format: `"x,y,z;x,y,z;..."` where each coordinate is an `f32`. +/// Malformed entries are skipped with a warning log. +pub fn parse_node_positions(input: &str) -> Vec<[f32; 3]> { + if input.is_empty() { + return Vec::new(); + } + input + .split(';') + .enumerate() + .filter_map(|(idx, triplet)| { + let parts: Vec<&str> = triplet.split(',').collect(); + if parts.len() != 3 { + tracing::warn!("Skipping malformed node position entry {idx}: '{triplet}' (expected x,y,z)"); + return None; + } + match (parts[0].parse::(), parts[1].parse::(), parts[2].parse::()) { + (Ok(x), Ok(y), Ok(z)) => Some([x, y, z]), + _ => { + tracing::warn!("Skipping unparseable node position entry {idx}: '{triplet}'"); + None + } + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_node_positions() { + let positions = parse_node_positions("0,0,1.5;3,0,1.5;1.5,3,1.5"); + assert_eq!(positions.len(), 3); + assert_eq!(positions[0], [0.0, 0.0, 1.5]); + assert_eq!(positions[1], [3.0, 0.0, 1.5]); + assert_eq!(positions[2], [1.5, 3.0, 1.5]); + } + + #[test] + fn test_parse_node_positions_empty() { + let positions = parse_node_positions(""); + assert!(positions.is_empty()); + } + + #[test] + fn test_parse_node_positions_invalid() { + let positions = parse_node_positions("abc;1,2,3"); + assert_eq!(positions.len(), 1); + assert_eq!(positions[0], [1.0, 2.0, 3.0]); + } + + #[test] + fn test_parse_node_positions_partial_triplet() { + let positions = parse_node_positions("1,2;3,4,5"); + assert_eq!(positions.len(), 1); + assert_eq!(positions[0], [3.0, 4.0, 5.0]); + } +} diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs index b0c16803a..fd4f6c2b8 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs @@ -9,8 +9,11 @@ //! Replaces both ws_server.py and the Python HTTP server. mod adaptive_classifier; +mod field_bridge; +mod multistatic_bridge; mod rvf_container; mod rvf_pipeline; +mod tracker_bridge; mod vital_signs; // Training pipeline modules (exposed via lib.rs) @@ -52,6 +55,11 @@ use wifi_densepose_wifiscan::{ }; use wifi_densepose_wifiscan::parse_netsh_output as parse_netsh_bssid_output; +// Accuracy sprint: Kalman tracker, multistatic fusion, field model +use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker; +use wifi_densepose_signal::ruvsense::multistatic::{MultistaticFuser, MultistaticConfig}; +use wifi_densepose_signal::ruvsense::field_model::{FieldModel, CalibrationStatus}; + // ── CLI ────────────────────────────────────────────────────────────────────── #[derive(Parser, Debug)] @@ -144,6 +152,14 @@ struct Args { /// Build fingerprint index from embeddings (env|activity|temporal|person) #[arg(long, value_name = "TYPE")] build_index: Option, + + /// Node positions for multistatic fusion (format: "x,y,z;x,y,z;...") + #[arg(long, env = "SENSING_NODE_POSITIONS")] + node_positions: Option, + + /// Start field model calibration on boot (empty room required) + #[arg(long)] + calibrate: bool, } // ── Data types ─────────────────────────────────────────────────────────────── @@ -212,6 +228,9 @@ struct SensingUpdate { /// Estimated person count from CSI feature heuristics (1-3 for single ESP32). #[serde(skip_serializing_if = "Option::is_none")] estimated_persons: Option, + /// Per-node feature breakdown for multi-node deployments. + #[serde(skip_serializing_if = "Option::is_none")] + node_features: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -279,9 +298,9 @@ struct BoundingBox { /// Each ESP32 node gets its own frame history, smoothing buffers, and vital /// sign detector so that data from different nodes is never mixed. struct NodeState { - frame_history: VecDeque>, + pub(crate) frame_history: VecDeque>, smoothed_person_score: f64, - prev_person_count: usize, + pub(crate) prev_person_count: usize, smoothed_motion: f64, current_motion_level: String, debounce_counter: u32, @@ -297,12 +316,12 @@ struct NodeState { rssi_history: VecDeque, vital_detector: VitalSignDetector, latest_vitals: VitalSigns, - last_frame_time: Option, + pub(crate) last_frame_time: Option, edge_vitals: Option, } impl NodeState { - fn new() -> Self { + pub(crate) fn new() -> Self { Self { frame_history: VecDeque::new(), smoothed_person_score: 0.0, @@ -328,6 +347,18 @@ impl NodeState { } } +/// Per-node feature info for WebSocket broadcasts (multi-node support). +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PerNodeFeatureInfo { + node_id: u8, + features: FeatureInfo, + classification: ClassificationInfo, + rssi_dbm: f64, + last_seen_ms: u64, + frame_rate_hz: f64, + stale: bool, +} + /// Shared application state struct AppStateInner { latest_update: Option, @@ -421,6 +452,15 @@ struct AppStateInner { /// Per-node sensing state for multi-node deployments. /// Keyed by `node_id` from the ESP32 frame header. node_states: HashMap, + // ── Accuracy sprint: Kalman tracker, multistatic fusion, eigenvalue counting ── + /// Global Kalman-based pose tracker for stable person IDs and smoothed keypoints. + pose_tracker: PoseTracker, + /// Instant of last tracker update (for computing dt). + last_tracker_instant: Option, + /// Attention-weighted multi-node CSI fusion engine. + multistatic_fuser: MultistaticFuser, + /// SVD-based room field model for eigenvalue person counting (None until calibration). + field_model: Option, } /// If no ESP32 frame arrives within this duration, source reverts to offline. @@ -430,6 +470,31 @@ impl AppStateInner { /// Return the effective data source, accounting for ESP32 frame timeout. /// If the source is "esp32" but no frame has arrived in 5 seconds, returns /// "esp32:offline" so the UI can distinguish active vs stale connections. + /// Person count: eigenvalue-based if field model is calibrated, else heuristic. + /// Uses global frame_history if populated, otherwise the freshest per-node history. + fn person_count(&self) -> usize { + match self.field_model.as_ref() { + Some(fm) => { + // Prefer global frame_history (populated by wifi/simulate paths). + // Fall back to freshest per-node history (populated by ESP32 paths). + let history = if !self.frame_history.is_empty() { + &self.frame_history + } else { + // Find the node with the most recent frame + self.node_states.values() + .filter(|ns| !ns.frame_history.is_empty()) + .max_by_key(|ns| ns.last_frame_time) + .map(|ns| &ns.frame_history) + .unwrap_or(&self.frame_history) + }; + field_bridge::occupancy_or_fallback( + fm, history, self.smoothed_person_score, self.prev_person_count, + ) + } + None => score_to_person_count(self.smoothed_person_score, self.prev_person_count), + } + } + fn effective_source(&self) -> String { if self.source == "esp32" { if let Some(last) = self.last_esp32_frame { @@ -570,7 +635,9 @@ fn parse_esp32_frame(buf: &[u8]) -> Option { let n_subcarriers = buf[6]; let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]); let sequence = u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]); - let rssi = buf[14] as i8; + let rssi_raw = buf[14] as i8; + // Fix RSSI sign: ensure it's always negative (dBm convention). + let rssi = if rssi_raw > 0 { rssi_raw.saturating_neg() } else { rssi_raw }; let noise_floor = buf[15] as i8; let iq_start = 20; @@ -1418,7 +1485,7 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { let raw_score = compute_person_score(&features); s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { - let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count); + let count = s.person_count(); s.prev_person_count = count; count } else { @@ -1455,12 +1522,16 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; - // Populate persons from the sensing update. - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + // Populate persons from the sensing update (Kalman-smoothed via tracker). + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -1551,7 +1622,7 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { let raw_score = compute_person_score(&features); s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { - let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count); + let count = s.person_count(); s.prev_person_count = count; count } else { @@ -1588,11 +1659,15 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { model_status: None, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -1769,9 +1844,13 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) { keypoints, zone: "zone_1".into(), }] - }).unwrap_or_else(|| derive_pose_from_sensing(&sensing)) + }).unwrap_or_else(|| { + // Prefer tracked persons from broadcast if available + sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing)) + }) } else { - derive_pose_from_sensing(&sensing) + // Prefer tracked persons from broadcast if available + sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing)) }; let pose_msg = serde_json::json!({ @@ -2210,7 +2289,7 @@ async fn api_info(State(state): State) -> Json { async fn pose_current(State(state): State) -> Json { let s = state.read().await; let persons = match &s.latest_update { - Some(update) => derive_pose_from_sensing(update), + Some(update) => update.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(update)), None => vec![], }; Json(serde_json::json!({ @@ -2761,6 +2840,88 @@ async fn adaptive_unload(State(state): State) -> Json) -> Json { + let mut s = state.write().await; + // Guard: don't discard an in-progress or fresh calibration + if let Some(ref fm) = s.field_model { + match fm.status() { + CalibrationStatus::Collecting => { + return Json(serde_json::json!({ + "success": false, + "error": "Calibration already in progress. Call /calibration/stop first.", + "frame_count": fm.calibration_frame_count(), + })); + } + CalibrationStatus::Fresh => { + return Json(serde_json::json!({ + "success": false, + "error": "A fresh calibration already exists. Call /calibration/stop or wait for expiry.", + })); + } + _ => {} // Stale/Expired/Uncalibrated — ok to recalibrate + } + } + match FieldModel::new(field_bridge::single_link_config()) { + Ok(fm) => { + s.field_model = Some(fm); + Json(serde_json::json!({ + "success": true, + "message": "Calibration started — keep room empty while frames accumulate.", + })) + } + Err(e) => Json(serde_json::json!({ + "success": false, + "error": format!("{e}"), + })), + } +} + +async fn calibration_stop(State(state): State) -> Json { + let mut s = state.write().await; + if let Some(ref mut fm) = s.field_model { + let ts = chrono::Utc::now().timestamp_micros() as u64; + match fm.finalize_calibration(ts, 0) { + Ok(modes) => { + let baseline = modes.baseline_eigenvalue_count; + let variance_explained = modes.variance_explained; + info!("Field model calibrated: baseline_eigenvalues={baseline}, variance_explained={variance_explained:.2}"); + Json(serde_json::json!({ + "success": true, + "baseline_eigenvalue_count": baseline, + "variance_explained": variance_explained, + "frame_count": fm.calibration_frame_count(), + })) + } + Err(e) => Json(serde_json::json!({ + "success": false, + "error": format!("{e}"), + })), + } + } else { + Json(serde_json::json!({ + "success": false, + "error": "No field model active — call /calibration/start first.", + })) + } +} + +async fn calibration_status(State(state): State) -> Json { + let s = state.read().await; + match s.field_model.as_ref() { + Some(fm) => Json(serde_json::json!({ + "active": true, + "status": format!("{:?}", fm.status()), + "frame_count": fm.calibration_frame_count(), + })), + None => Json(serde_json::json!({ + "active": false, + "status": "none", + })), + } +} + /// Generate a simple timestamp string (epoch seconds) for recording IDs. fn chrono_timestamp() -> u64 { std::time::SystemTime::now() @@ -2907,6 +3068,34 @@ async fn sona_activate( } } +/// GET /api/v1/nodes — per-node health and feature info. +async fn nodes_endpoint(State(state): State) -> Json { + let s = state.read().await; + let now = std::time::Instant::now(); + let nodes: Vec = s.node_states.iter() + .map(|(&id, ns)| { + let elapsed_ms = ns.last_frame_time + .map(|t| now.duration_since(t).as_millis() as u64) + .unwrap_or(999999); + let stale = elapsed_ms > 5000; + let status = if stale { "stale" } else { "active" }; + let rssi = ns.rssi_history.back().copied().unwrap_or(-90.0); + serde_json::json!({ + "node_id": id, + "status": status, + "last_seen_ms": elapsed_ms, + "rssi_dbm": rssi, + "motion_level": &ns.current_motion_level, + "person_count": ns.prev_person_count, + }) + }) + .collect(); + Json(serde_json::json!({ + "nodes": nodes, + "total": nodes.len(), + })) +} + async fn info_page() -> Html { Html(format!( "\ @@ -2998,12 +3187,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { else if vitals.presence { 0.3 } else { 0.05 }; - // Aggregate person count across all active nodes. + // Aggregate person count: gate on presence first (matching WiFi path). let now = std::time::Instant::now(); - let total_persons: usize = s.node_states.values() - .filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) - .map(|n| n.prev_person_count) - .sum(); + let total_persons = if vitals.presence { + let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback( + &s.multistatic_fuser, &s.node_states, + ); + match fused { + Some(ref f) => { + let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude); + s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10; + let count = s.person_count(); + s.prev_person_count = count; + count.max(1) // presence=true => at least 1 + } + None => fallback_count.unwrap_or(0).max(1), + } + } else { + s.prev_person_count = 0; + 0 + }; + + // Feed field model calibration if active (use per-node history for ESP32). + if let Some(ref mut fm) = s.field_model { + if let Some(ns) = s.node_states.get(&node_id) { + field_bridge::maybe_feed_calibration(fm, &ns.frame_history); + } + } // Build nodes array with all active nodes. let active_nodes: Vec = s.node_states.iter() @@ -3062,11 +3272,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { model_status: None, persons: None, estimated_persons: if total_persons > 0 { Some(total_persons) } else { None }, + node_features: None, }; - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -3196,12 +3410,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { else if classification.motion_level == "present_still" { 0.3 } else { 0.05 }; - // Aggregate person count across all active nodes. + // Aggregate person count: gate on presence first (matching WiFi path). let now = std::time::Instant::now(); - let total_persons: usize = s.node_states.values() - .filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) - .map(|n| n.prev_person_count) - .sum(); + let total_persons = if classification.presence { + let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback( + &s.multistatic_fuser, &s.node_states, + ); + match fused { + Some(ref f) => { + let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude); + s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10; + let count = s.person_count(); + s.prev_person_count = count; + count.max(1) + } + None => fallback_count.unwrap_or(0).max(1), + } + } else { + s.prev_person_count = 0; + 0 + }; + + // Feed field model calibration if active (use per-node history for ESP32). + if let Some(ref mut fm) = s.field_model { + if let Some(ns) = s.node_states.get(&node_id) { + field_bridge::maybe_feed_calibration(fm, &ns.frame_history); + } + } // Build nodes array with all active nodes. let active_nodes: Vec = s.node_states.iter() @@ -3240,11 +3475,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { model_status: None, persons: None, estimated_persons: if total_persons > 0 { Some(total_persons) } else { None }, + node_features: None, }; - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if let Ok(json) = serde_json::to_string(&update) { @@ -3311,7 +3550,7 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { let raw_score = compute_person_score(&features); s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { - let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count); + let count = s.person_count(); s.prev_person_count = count; count } else { @@ -3358,12 +3597,16 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { }, persons: None, estimated_persons: if est_persons > 0 { Some(est_persons) } else { None }, + node_features: None, }; - // Populate persons from the sensing update. - let persons = derive_pose_from_sensing(&update); - if !persons.is_empty() { - update.persons = Some(persons); + // Populate persons from the sensing update (Kalman-smoothed via tracker). + let raw_persons = derive_pose_from_sensing(&update); + let tracked = tracker_bridge::tracker_update( + &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons, + ); + if !tracked.is_empty() { + update.persons = Some(tracked); } if update.classification.presence { @@ -3992,6 +4235,29 @@ async fn main() { m }), node_states: HashMap::new(), + // Accuracy sprint + pose_tracker: PoseTracker::new(), + last_tracker_instant: None, + multistatic_fuser: { + let mut fuser = MultistaticFuser::with_config(MultistaticConfig { + min_nodes: 1, // single-node passthrough + ..Default::default() + }); + if let Some(ref pos_str) = args.node_positions { + let positions = field_bridge::parse_node_positions(pos_str); + if !positions.is_empty() { + info!("Configured {} node positions for multistatic fusion", positions.len()); + fuser.set_node_positions(positions); + } + } + fuser + }, + field_model: if args.calibrate { + info!("Field model calibration enabled — room should be empty during startup"); + FieldModel::new(field_bridge::single_link_config()).ok() + } else { + None + }, })); // Start background tasks based on source @@ -4045,6 +4311,8 @@ async fn main() { .route("/api/v1/metrics", get(health_metrics)) // Sensing endpoints .route("/api/v1/sensing/latest", get(latest)) + // Per-node health endpoint + .route("/api/v1/nodes", get(nodes_endpoint)) // Vital sign endpoints .route("/api/v1/vital-signs", get(vital_signs_endpoint)) .route("/api/v1/edge-vitals", get(edge_vitals_endpoint)) @@ -4086,6 +4354,10 @@ async fn main() { .route("/api/v1/adaptive/train", post(adaptive_train)) .route("/api/v1/adaptive/status", get(adaptive_status)) .route("/api/v1/adaptive/unload", post(adaptive_unload)) + // Field model calibration (eigenvalue-based person counting) + .route("/api/v1/calibration/start", post(calibration_start)) + .route("/api/v1/calibration/stop", post(calibration_stop)) + .route("/api/v1/calibration/status", get(calibration_status)) // Static UI files .nest_service("/ui", ServeDir::new(&ui_path)) .layer(SetResponseHeaderLayer::overriding( diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs new file mode 100644 index 000000000..794b15bc7 --- /dev/null +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs @@ -0,0 +1,264 @@ +//! Bridge between sensing-server per-node state and the signal crate's +//! `MultistaticFuser` for attention-weighted CSI fusion across ESP32 nodes. +//! +//! This module converts the server's `NodeState` (f64 amplitude history) into +//! `MultiBandCsiFrame`s that the multistatic fusion pipeline expects, then +//! drives `MultistaticFuser::fuse` with a graceful fallback when fusion fails +//! (e.g. insufficient nodes or timestamp spread). + +use std::collections::HashMap; +use std::sync::LazyLock; +use std::time::{Duration, Instant}; + +use wifi_densepose_signal::hardware_norm::{CanonicalCsiFrame, HardwareType}; +use wifi_densepose_signal::ruvsense::multiband::MultiBandCsiFrame; +use wifi_densepose_signal::ruvsense::multistatic::{FusedSensingFrame, MultistaticFuser}; + +use super::NodeState; + +/// Maximum age for a node frame to be considered active (10 seconds). +const STALE_THRESHOLD: Duration = Duration::from_secs(10); + +/// Default WiFi channel frequency (MHz) used for single-channel frames. +const DEFAULT_FREQ_MHZ: u32 = 2437; // Channel 6 + +/// Monotonic reference point for timestamp generation. All node timestamps +/// are relative to this instant, avoiding wall-clock/monotonic mixing issues. +static EPOCH: LazyLock = LazyLock::new(Instant::now); + +/// Convert a single `NodeState` into a `MultiBandCsiFrame` suitable for +/// multistatic fusion. +/// +/// Returns `None` when the node has no frame history or no recorded +/// `last_frame_time`. +pub fn node_frame_from_state(node_id: u8, ns: &NodeState) -> Option { + let last_time = ns.last_frame_time.as_ref()?; + let latest = ns.frame_history.back()?; + if latest.is_empty() { + return None; + } + + let amplitude: Vec = latest.iter().map(|&v| v as f32).collect(); + let n_sub = amplitude.len(); + let phase = vec![0.0_f32; n_sub]; + + // Monotonic timestamp: microseconds since a shared process-local epoch. + // All nodes use the same reference so the fuser's guard_interval_us check + // compares apples to apples. No wall-clock mixing (immune to NTP jumps). + let timestamp_us = last_time.duration_since(*EPOCH).as_micros() as u64; + + let canonical = CanonicalCsiFrame { + amplitude, + phase, + hardware_type: HardwareType::Esp32S3, + }; + + Some(MultiBandCsiFrame { + node_id, + timestamp_us, + channel_frames: vec![canonical], + frequencies_mhz: vec![DEFAULT_FREQ_MHZ], + coherence: 1.0, // single-channel, perfect self-coherence + }) +} + +/// Collect `MultiBandCsiFrame`s from all active nodes. +/// +/// A node is considered active if its `last_frame_time` is within +/// [`STALE_THRESHOLD`] of `now`. +pub fn node_frames_from_states(node_states: &HashMap) -> Vec { + let now = Instant::now(); + let mut frames = Vec::with_capacity(node_states.len()); + + for (&node_id, ns) in node_states { + // Skip stale nodes + if let Some(ref t) = ns.last_frame_time { + if now.duration_since(*t) > STALE_THRESHOLD { + continue; + } + } else { + continue; + } + + if let Some(frame) = node_frame_from_state(node_id, ns) { + frames.push(frame); + } + } + + frames +} + +/// Attempt multistatic fusion; fall back to max per-node person count on failure. +/// +/// Returns `(fused_frame, fallback_person_count)`. When fusion succeeds, +/// `fallback_person_count` is `None` — the caller must compute count from +/// the fused amplitudes. On failure, returns the maximum per-node count +/// (not the sum, to avoid double-counting overlapping coverage). +pub fn fuse_or_fallback( + fuser: &MultistaticFuser, + node_states: &HashMap, +) -> (Option, Option) { + let frames = node_frames_from_states(node_states); + if frames.is_empty() { + return (None, Some(0)); + } + + match fuser.fuse(&frames) { + Ok(fused) => { + // Caller must compute person count from fused amplitudes. + (Some(fused), None) + } + Err(e) => { + tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback"); + // Use max (not sum) to avoid double-counting when nodes have overlapping coverage. + let max_count: usize = node_states + .values() + .filter(|ns| { + ns.last_frame_time + .map(|t| t.elapsed() <= STALE_THRESHOLD) + .unwrap_or(false) + }) + .map(|ns| ns.prev_person_count) + .max() + .unwrap_or(0); + (None, Some(max_count)) + } + } +} + +/// Compute a person-presence score from fused amplitude data. +/// +/// Uses the squared coefficient of variation (variance / mean^2) as a +/// lightweight proxy for body-induced CSI perturbation. A flat amplitude +/// vector (no person) yields a score near zero; a vector with high variance +/// relative to its mean (person moving) yields a score approaching 1.0. +pub fn compute_person_score_from_amplitudes(amplitudes: &[f32]) -> f64 { + if amplitudes.is_empty() { + return 0.0; + } + + let n = amplitudes.len() as f64; + let sum: f64 = amplitudes.iter().map(|&a| a as f64).sum(); + let mean = sum / n; + + let variance: f64 = amplitudes.iter().map(|&a| { + let diff = (a as f64) - mean; + diff * diff + }).sum::() / n; + + let score = variance / (mean * mean + 1e-10); + score.clamp(0.0, 1.0) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::VecDeque; + + /// Helper: build a minimal NodeState for testing. Uses `NodeState::new()` + /// then mutates the `pub(crate)` fields the bridge needs. + fn make_node_state( + frame_history: VecDeque>, + last_frame_time: Option, + prev_person_count: usize, + ) -> NodeState { + let mut ns = NodeState::new(); + ns.frame_history = frame_history; + ns.last_frame_time = last_frame_time; + ns.prev_person_count = prev_person_count; + ns + } + + #[test] + fn test_node_frame_from_empty_state() { + let ns = make_node_state(VecDeque::new(), Some(Instant::now()), 0); + assert!(node_frame_from_state(1, &ns).is_none()); + } + + #[test] + fn test_node_frame_from_state_no_time() { + let mut history = VecDeque::new(); + history.push_back(vec![1.0, 2.0, 3.0]); + let ns = make_node_state(history, None, 0); + assert!(node_frame_from_state(1, &ns).is_none()); + } + + #[test] + fn test_node_frame_conversion() { + let mut history = VecDeque::new(); + history.push_back(vec![10.0, 20.0, 30.5]); + let ns = make_node_state(history, Some(Instant::now()), 0); + + let frame = node_frame_from_state(42, &ns).expect("should produce a frame"); + assert_eq!(frame.node_id, 42); + assert_eq!(frame.channel_frames.len(), 1); + + let ch = &frame.channel_frames[0]; + assert_eq!(ch.amplitude.len(), 3); + assert!((ch.amplitude[0] - 10.0_f32).abs() < f32::EPSILON); + assert!((ch.amplitude[1] - 20.0_f32).abs() < f32::EPSILON); + assert!((ch.amplitude[2] - 30.5_f32).abs() < f32::EPSILON); + // Phase should be all zeros + assert!(ch.phase.iter().all(|&p| p == 0.0)); + assert_eq!(ch.hardware_type, HardwareType::Esp32S3); + } + + #[test] + fn test_stale_node_excluded() { + let mut states: HashMap = HashMap::new(); + + // Active node: frame just received + let mut active_history = VecDeque::new(); + active_history.push_back(vec![1.0, 2.0]); + states.insert(1, make_node_state(active_history, Some(Instant::now()), 1)); + + // Stale node: frame 20 seconds ago + let mut stale_history = VecDeque::new(); + stale_history.push_back(vec![3.0, 4.0]); + let stale_time = Instant::now() - Duration::from_secs(20); + states.insert(2, make_node_state(stale_history, Some(stale_time), 1)); + + let frames = node_frames_from_states(&states); + assert_eq!(frames.len(), 1, "stale node should be excluded"); + assert_eq!(frames[0].node_id, 1); + } + + #[test] + fn test_compute_person_score_empty() { + assert!((compute_person_score_from_amplitudes(&[]) - 0.0).abs() < f64::EPSILON); + } + + #[test] + fn test_compute_person_score_flat() { + // Constant amplitude => variance = 0 => score ~ 0 + let flat = vec![5.0_f32; 64]; + let score = compute_person_score_from_amplitudes(&flat); + assert!(score < 0.001, "flat signal should have near-zero score, got {score}"); + } + + #[test] + fn test_compute_person_score_varied() { + // High variance relative to mean should produce a positive score + let varied: Vec = (0..64).map(|i| if i % 2 == 0 { 1.0 } else { 10.0 }).collect(); + let score = compute_person_score_from_amplitudes(&varied); + assert!(score > 0.1, "varied signal should have positive score, got {score}"); + assert!(score <= 1.0, "score should be clamped to 1.0, got {score}"); + } + + #[test] + fn test_compute_person_score_clamped() { + // Near-zero mean with non-zero variance => would blow up without clamp + let vals = vec![0.0_f32, 0.0, 0.0, 0.001]; + let score = compute_person_score_from_amplitudes(&vals); + assert!(score <= 1.0, "score must be clamped to 1.0"); + } + + #[test] + fn test_fuse_or_fallback_empty() { + let fuser = MultistaticFuser::new(); + let states: HashMap = HashMap::new(); + let (fused, count) = fuse_or_fallback(&fuser, &states); + assert!(fused.is_none()); + assert_eq!(count, Some(0)); + } +} diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs new file mode 100644 index 000000000..b66d0fcf4 --- /dev/null +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs @@ -0,0 +1,409 @@ +//! Bridge between sensing-server PersonDetection types and signal crate PoseTracker. +//! +//! The sensing server uses f64 types (PersonDetection, PoseKeypoint, BoundingBox) +//! while the signal crate's PoseTracker operates on f32 Kalman states. This module +//! provides conversion functions and a single `tracker_update` entry point that +//! accepts server-side detections and returns tracker-smoothed results. + +use std::time::Instant; +use wifi_densepose_signal::ruvsense::{ + self, KeypointState, PoseTrack, TrackLifecycleState, TrackId, NUM_KEYPOINTS, +}; +use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker; + +use super::{BoundingBox, PersonDetection, PoseKeypoint}; + +/// COCO-17 keypoint names in index order. +const COCO_NAMES: [&str; 17] = [ + "nose", + "left_eye", + "right_eye", + "left_ear", + "right_ear", + "left_shoulder", + "right_shoulder", + "left_elbow", + "right_elbow", + "left_wrist", + "right_wrist", + "left_hip", + "right_hip", + "left_knee", + "right_knee", + "left_ankle", + "right_ankle", +]; + +/// Map a lowercase keypoint name to its COCO-17 index. +fn keypoint_name_to_coco_index(name: &str) -> Option { + COCO_NAMES.iter().position(|&n| n.eq_ignore_ascii_case(name)) +} + +/// Convert server-side PersonDetection slices into tracker-compatible keypoint arrays. +/// +/// For each person, maps named keypoints to COCO-17 positions. Unmapped slots are +/// filled with the centroid of the mapped keypoints so the Kalman filter has a +/// reasonable initial value rather than zeros. +fn detections_to_tracker_keypoints(persons: &[PersonDetection]) -> Vec<[[f32; 3]; 17]> { + persons + .iter() + .map(|person| { + let mut kps = [[0.0_f32; 3]; 17]; + let mut mapped_count = 0u32; + let mut cx = 0.0_f32; + let mut cy = 0.0_f32; + let mut cz = 0.0_f32; + + // First pass: place mapped keypoints and accumulate centroid + for kp in &person.keypoints { + if let Some(idx) = keypoint_name_to_coco_index(&kp.name) { + kps[idx] = [kp.x as f32, kp.y as f32, kp.z as f32]; + cx += kp.x as f32; + cy += kp.y as f32; + cz += kp.z as f32; + mapped_count += 1; + } + } + + // Compute centroid of mapped keypoints + let centroid = if mapped_count > 0 { + let n = mapped_count as f32; + [cx / n, cy / n, cz / n] + } else { + [0.0, 0.0, 0.0] + }; + + // Second pass: fill unmapped slots with centroid + // Build a set of mapped indices + let mut mapped = [false; 17]; + for kp in &person.keypoints { + if let Some(idx) = keypoint_name_to_coco_index(&kp.name) { + mapped[idx] = true; + } + } + for i in 0..17 { + if !mapped[i] { + kps[i] = centroid; + } + } + + kps + }) + .collect() +} + +/// Convert active PoseTracker tracks back into server-side PersonDetection values. +/// +/// Only tracks whose lifecycle `is_alive()` are included. +pub fn tracker_to_person_detections(tracker: &PoseTracker) -> Vec { + tracker + .active_tracks() + .into_iter() + .map(|track| { + let id = track.id.0 as u32; + + let confidence = match track.lifecycle { + TrackLifecycleState::Active => 0.9, + TrackLifecycleState::Tentative => 0.5, + TrackLifecycleState::Lost => 0.3, + TrackLifecycleState::Terminated => 0.0, + }; + + // Build keypoints from Kalman state + let keypoints: Vec = (0..NUM_KEYPOINTS) + .map(|i| { + let pos = track.keypoints[i].position(); + PoseKeypoint { + name: COCO_NAMES[i].to_string(), + x: pos[0] as f64, + y: pos[1] as f64, + z: pos[2] as f64, + confidence: track.keypoints[i].confidence as f64, + } + }) + .collect(); + + // Compute bounding box from observed keypoints only (confidence > 0). + // Unobserved slots (centroid-filled) collapse the bbox over time. + let mut min_x = f64::MAX; + let mut min_y = f64::MAX; + let mut max_x = f64::MIN; + let mut max_y = f64::MIN; + let mut observed = 0; + for kp in &keypoints { + if kp.confidence > 0.0 { + if kp.x < min_x { min_x = kp.x; } + if kp.y < min_y { min_y = kp.y; } + if kp.x > max_x { max_x = kp.x; } + if kp.y > max_y { max_y = kp.y; } + observed += 1; + } + } + + let bbox = if observed > 0 { + BoundingBox { + x: min_x, + y: min_y, + width: (max_x - min_x).max(0.01), + height: (max_y - min_y).max(0.01), + } + } else { + // No observed keypoints — use a default bbox at centroid + let cx = keypoints.iter().map(|k| k.x).sum::() / keypoints.len() as f64; + let cy = keypoints.iter().map(|k| k.y).sum::() / keypoints.len() as f64; + BoundingBox { x: cx - 0.3, y: cy - 0.5, width: 0.6, height: 1.0 } + }; + + PersonDetection { + id, + confidence, + keypoints, + bbox, + zone: "tracked".to_string(), + } + }) + .collect() +} + +/// Run one tracker cycle: predict, match detections, update, prune. +/// +/// This is the main entry point called each sensing frame. It: +/// 1. Computes dt from the previous call instant +/// 2. Predicts all existing tracks forward +/// 3. Greedily assigns detections to tracks by Mahalanobis cost +/// 4. Updates matched tracks, creates new tracks for unmatched detections +/// 5. Prunes terminated tracks +/// 6. Returns smoothed PersonDetection values from the tracker state +pub fn tracker_update( + tracker: &mut PoseTracker, + last_instant: &mut Option, + persons: Vec, +) -> Vec { + let now = Instant::now(); + let dt = last_instant.map_or(0.1_f32, |prev| now.duration_since(prev).as_secs_f32()); + *last_instant = Some(now); + + // Predict all tracks forward + tracker.predict_all(dt); + + if persons.is_empty() { + tracker.prune_terminated(); + return tracker_to_person_detections(tracker); + } + + // Convert detections to f32 keypoint arrays + let all_keypoints = detections_to_tracker_keypoints(&persons); + + // Compute centroids for each detection + let centroids: Vec<[f32; 3]> = all_keypoints + .iter() + .map(|kps| { + let mut c = [0.0_f32; 3]; + for kp in kps { + c[0] += kp[0]; + c[1] += kp[1]; + c[2] += kp[2]; + } + let n = NUM_KEYPOINTS as f32; + c[0] /= n; + c[1] /= n; + c[2] /= n; + c + }) + .collect(); + + // Greedy assignment: for each detection, find the best matching active track. + // Collect tracks once to avoid re-borrowing tracker per detection. + let active: Vec<(TrackId, [f32; 3])> = tracker.active_tracks().iter().map(|t| { + let centroid = { + let mut c = [0.0_f32; 3]; + for kp in &t.keypoints { + let p = kp.position(); + c[0] += p[0]; c[1] += p[1]; c[2] += p[2]; + } + let n = NUM_KEYPOINTS as f32; + [c[0] / n, c[1] / n, c[2] / n] + }; + (t.id, centroid) + }).collect(); + + let mut used_tracks: Vec = vec![false; active.len()]; + let mut matched: Vec> = vec![None; persons.len()]; + + for det_idx in 0..persons.len() { + let mut best_cost = f32::MAX; + let mut best_track_idx = None; + + let active_refs = tracker.active_tracks(); + for (track_idx, track) in active_refs.iter().enumerate() { + if used_tracks[track_idx] { + continue; + } + let cost = tracker.assignment_cost(track, ¢roids[det_idx], &[]); + if cost < best_cost { + best_cost = cost; + best_track_idx = Some(track_idx); + } + } + + // Mahalanobis gate: 9.0 (default TrackerConfig) + if best_cost < 9.0 { + if let Some(tidx) = best_track_idx { + matched[det_idx] = Some(active[tidx].0); + used_tracks[tidx] = true; + } + } + } + + // Timestamp for new/updated tracks (microseconds since UNIX epoch) + let timestamp_us = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_micros() as u64) + .unwrap_or(0); + + // Update matched tracks (uses update_keypoints for proper lifecycle transitions) + for (det_idx, track_id_opt) in matched.iter().enumerate() { + if let Some(track_id) = track_id_opt { + if let Some(track) = tracker.find_track_mut(*track_id) { + track.update_keypoints(&all_keypoints[det_idx], 0.08, 1.0, timestamp_us); + } + } + } + + // Create new tracks for unmatched detections + for (det_idx, track_id_opt) in matched.iter().enumerate() { + if track_id_opt.is_none() { + tracker.create_track(&all_keypoints[det_idx], timestamp_us); + } + } + + tracker.prune_terminated(); + tracker_to_person_detections(tracker) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_keypoint(name: &str, x: f64, y: f64, z: f64) -> PoseKeypoint { + PoseKeypoint { + name: name.to_string(), + x, + y, + z, + confidence: 0.9, + } + } + + fn make_person(id: u32, keypoints: Vec) -> PersonDetection { + PersonDetection { + id, + confidence: 0.8, + keypoints, + bbox: BoundingBox { + x: 0.0, + y: 0.0, + width: 1.0, + height: 1.0, + }, + zone: "test".to_string(), + } + } + + #[test] + fn test_keypoint_name_to_coco_index() { + assert_eq!(keypoint_name_to_coco_index("nose"), Some(0)); + assert_eq!(keypoint_name_to_coco_index("left_eye"), Some(1)); + assert_eq!(keypoint_name_to_coco_index("right_eye"), Some(2)); + assert_eq!(keypoint_name_to_coco_index("left_ear"), Some(3)); + assert_eq!(keypoint_name_to_coco_index("right_ear"), Some(4)); + assert_eq!(keypoint_name_to_coco_index("left_shoulder"), Some(5)); + assert_eq!(keypoint_name_to_coco_index("right_shoulder"), Some(6)); + assert_eq!(keypoint_name_to_coco_index("left_elbow"), Some(7)); + assert_eq!(keypoint_name_to_coco_index("right_elbow"), Some(8)); + assert_eq!(keypoint_name_to_coco_index("left_wrist"), Some(9)); + assert_eq!(keypoint_name_to_coco_index("right_wrist"), Some(10)); + assert_eq!(keypoint_name_to_coco_index("left_hip"), Some(11)); + assert_eq!(keypoint_name_to_coco_index("right_hip"), Some(12)); + assert_eq!(keypoint_name_to_coco_index("left_knee"), Some(13)); + assert_eq!(keypoint_name_to_coco_index("right_knee"), Some(14)); + assert_eq!(keypoint_name_to_coco_index("left_ankle"), Some(15)); + assert_eq!(keypoint_name_to_coco_index("right_ankle"), Some(16)); + assert_eq!(keypoint_name_to_coco_index("unknown"), None); + // Case insensitive + assert_eq!(keypoint_name_to_coco_index("NOSE"), Some(0)); + assert_eq!(keypoint_name_to_coco_index("Left_Eye"), Some(1)); + } + + #[test] + fn test_detections_to_tracker_keypoints() { + let person = make_person( + 1, + vec![ + make_keypoint("nose", 1.0, 2.0, 0.5), + make_keypoint("left_shoulder", 0.8, 2.5, 0.4), + make_keypoint("right_shoulder", 1.2, 2.5, 0.6), + ], + ); + + let result = detections_to_tracker_keypoints(&[person]); + assert_eq!(result.len(), 1); + + let kps = &result[0]; + + // Mapped keypoints should have correct values + assert!((kps[0][0] - 1.0).abs() < 1e-5); // nose x + assert!((kps[0][1] - 2.0).abs() < 1e-5); // nose y + assert!((kps[0][2] - 0.5).abs() < 1e-5); // nose z + + assert!((kps[5][0] - 0.8).abs() < 1e-5); // left_shoulder x + assert!((kps[6][0] - 1.2).abs() < 1e-5); // right_shoulder x + + // Unmapped keypoints should be at centroid of mapped keypoints + // centroid = ((1.0+0.8+1.2)/3, (2.0+2.5+2.5)/3, (0.5+0.4+0.6)/3) + let cx = (1.0 + 0.8 + 1.2) / 3.0; + let cy = (2.0 + 2.5 + 2.5) / 3.0; + let cz = (0.5 + 0.4 + 0.6) / 3.0; + + // left_eye (index 1) should be at centroid + assert!((kps[1][0] - cx).abs() < 1e-4); + assert!((kps[1][1] - cy).abs() < 1e-4); + assert!((kps[1][2] - cz).abs() < 1e-4); + } + + #[test] + fn test_tracker_update_stable_ids() { + let mut tracker = PoseTracker::new(); + let mut last_instant: Option = None; + + let person = make_person( + 0, + vec![ + make_keypoint("nose", 1.0, 2.0, 0.0), + make_keypoint("left_shoulder", 0.8, 2.5, 0.0), + make_keypoint("right_shoulder", 1.2, 2.5, 0.0), + make_keypoint("left_hip", 0.9, 3.5, 0.0), + make_keypoint("right_hip", 1.1, 3.5, 0.0), + ], + ); + + // First update: creates a new track + let result1 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]); + assert_eq!(result1.len(), 1); + let id1 = result1[0].id; + + // Second update: should match the existing track + let result2 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]); + assert_eq!(result2.len(), 1); + let id2 = result2[0].id; + + // Third update: same track ID should persist + let result3 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]); + assert_eq!(result3.len(), 1); + let id3 = result3[0].id; + + // All three updates should return the same track ID + assert_eq!(id1, id2, "Track ID should be stable across updates"); + assert_eq!(id2, id3, "Track ID should be stable across updates"); + } +} diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml index 11114e9bb..b3c16e0dd 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/Cargo.toml @@ -11,6 +11,12 @@ keywords = ["wifi", "csi", "signal-processing", "densepose", "rust"] categories = ["science", "computer-vision"] readme = "README.md" +[features] +default = ["eigenvalue"] +## Enable eigenvalue-based person counting (requires BLAS via ndarray-linalg). +## Disable with --no-default-features to use the diagonal fallback instead. +eigenvalue = ["ndarray-linalg"] + [dependencies] # Core utilities thiserror.workspace = true @@ -20,6 +26,7 @@ chrono = { version = "0.4", features = ["serde"] } # Signal processing ndarray = { workspace = true } +ndarray-linalg = { workspace = true, optional = true } rustfft.workspace = true num-complex.workspace = true num-traits.workspace = true diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs index 7494235e0..028c772db 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-signal/src/ruvsense/field_model.rs @@ -17,6 +17,12 @@ //! of Squares and Products." Technometrics. //! - ADR-030: RuvSense Persistent Field Model +use ndarray::Array2; +#[cfg(feature = "eigenvalue")] +use ndarray_linalg::Eigh; +#[cfg(feature = "eigenvalue")] +use ndarray_linalg::UPLO; + // --------------------------------------------------------------------------- // Error types // --------------------------------------------------------------------------- @@ -47,6 +53,14 @@ pub enum FieldModelError { /// Invalid configuration parameter. #[error("Invalid configuration: {0}")] InvalidConfig(String), + + /// Model has not been calibrated yet. + #[error("Field model not calibrated")] + NotCalibrated, + + /// Not enough data for the requested operation. + #[error("Insufficient data: need {need}, have {have}")] + InsufficientData { need: usize, have: usize }, } // --------------------------------------------------------------------------- @@ -260,6 +274,8 @@ pub struct FieldNormalMode { pub calibrated_at_us: u64, /// Hash of mesh geometry at calibration time. pub geometry_hash: u64, + /// Baseline eigenvalue count above Marcenko-Pastur threshold (empty-room). + pub baseline_eigenvalue_count: usize, } /// Body perturbation extracted from a CSI observation. @@ -310,6 +326,60 @@ pub struct FieldModel { status: CalibrationStatus, /// Timestamp of last calibration completion (microseconds). last_calibration_us: u64, + /// Running outer-product sum for full covariance SVD: [n_sub x n_sub]. + covariance_sum: Option>, + /// Number of frames accumulated into covariance_sum. + covariance_count: u64, +} + +/// Diagonal variance fallback for when full covariance SVD is unavailable. +/// +/// Returns `(mode_energies, environmental_modes, baseline_eigenvalue_count)`. +fn diagonal_fallback( + link_stats: &[LinkBaselineStats], + n_sc: usize, + n_modes: usize, +) -> (Vec, Vec>, usize) { + // Average variance across links (diagonal approximation) + let mut avg_variance = vec![0.0_f64; n_sc]; + for ls in link_stats { + let var = ls.variance_vector(); + for (i, v) in var.iter().enumerate() { + avg_variance[i] += v; + } + } + let n_links_f = link_stats.len() as f64; + if n_links_f > 0.0 { + for v in avg_variance.iter_mut() { + *v /= n_links_f; + } + } + + // Sort subcarrier indices by variance (descending) to pick top-K modes + let mut indices: Vec = (0..n_sc).collect(); + indices.sort_by(|&a, &b| { + avg_variance[b] + .partial_cmp(&avg_variance[a]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + let mut environmental_modes = Vec::with_capacity(n_modes); + let mut mode_energies = Vec::with_capacity(n_modes); + + for k in 0..n_modes.min(n_sc) { + let idx = indices[k]; + let mut mode = vec![0.0_f64; n_sc]; + mode[idx] = 1.0; + mode_energies.push(avg_variance[idx]); + environmental_modes.push(mode); + } + + // For diagonal fallback, estimate baseline eigenvalue count from variance + let total_var: f64 = avg_variance.iter().sum(); + let mean_var = if n_sc > 0 { total_var / n_sc as f64 } else { 0.0 }; + let baseline_count = avg_variance.iter().filter(|&&v| v > mean_var * 2.0).count(); + + (mode_energies, environmental_modes, baseline_count) } impl FieldModel { @@ -339,6 +409,8 @@ impl FieldModel { modes: None, status: CalibrationStatus::Uncalibrated, last_calibration_us: 0, + covariance_sum: None, + covariance_count: 0, }) } @@ -375,6 +447,30 @@ impl FieldModel { if self.status == CalibrationStatus::Uncalibrated { self.status = CalibrationStatus::Collecting; } + + // Accumulate raw outer products for SVD covariance (no centering here — + // mean subtraction is deferred to finalize_calibration to avoid bias). + // We average across links so covariance_count tracks frames, not links. + let n = self.config.n_subcarriers; + let cov = self.covariance_sum.get_or_insert_with(|| Array2::zeros((n, n))); + let n_links = observations.len(); + for obs in observations { + if obs.len() >= n { + // Rank-1 update: cov += obs * obs^T (raw, un-centered) + for i in 0..n { + for j in i..n { + let val = obs[i] * obs[j]; + cov[[i, j]] += val; + if i != j { + cov[[j, i]] += val; + } + } + } + } + } + // Count once per frame (not per link) for correct MP ratio + self.covariance_count += 1; + Ok(()) } @@ -396,58 +492,134 @@ impl FieldModel { }); } - // Build covariance matrix from per-link variance data. - // We average the variance vectors across all links to get the - // covariance diagonal, then compute eigenmodes via power iteration. let n_sc = self.config.n_subcarriers; let n_modes = self.config.n_modes.min(n_sc); // Collect per-link baselines let baseline: Vec> = self.link_stats.iter().map(|ls| ls.mean_vector()).collect(); - // Average covariance across links (diagonal approximation) - let mut avg_variance = vec![0.0_f64; n_sc]; - for ls in &self.link_stats { - let var = ls.variance_vector(); - for (i, v) in var.iter().enumerate() { - avg_variance[i] += v; + // --- True eigenvalue decomposition (with diagonal fallback) --- + let (mode_energies, environmental_modes, baseline_eig_count) = + if let Some(ref cov_sum) = self.covariance_sum { + if self.covariance_count > 1 { + // Compute sample covariance from raw outer products: + // cov = (sum_xx / N - mean * mean^T) * N / (N-1) + // where sum_xx accumulated obs * obs^T across all links per frame. + // We average per-link means for centering. + let n_frames = self.covariance_count as f64; + let n_links = self.config.n_links as f64; + // Average mean across all links + let mut avg_mean = vec![0.0f64; n_sc]; + for ls in &self.link_stats { + let m = ls.mean_vector(); + for i in 0..n_sc { avg_mean[i] += m[i]; } + } + for i in 0..n_sc { avg_mean[i] /= n_links; } + // cov = sum_xx / (N * n_links) - mean * mean^T, then Bessel correction + let total_obs = n_frames * n_links; + let mut covariance = cov_sum / total_obs; + for i in 0..n_sc { + for j in 0..n_sc { + covariance[[i, j]] -= avg_mean[i] * avg_mean[j]; + } + } + // Bessel's correction: multiply by N/(N-1) where N = total observations + let bessel = total_obs / (total_obs - 1.0); + covariance *= bessel; + + // Symmetric eigendecomposition (requires eigenvalue feature / BLAS) + #[cfg(feature = "eigenvalue")] + match covariance.eigh(UPLO::Upper) { + Ok((eigenvalues, eigenvectors)) => { + // eigenvalues are in ascending order from ndarray-linalg + // Reverse to get descending + let len = eigenvalues.len(); + let mut sorted_indices: Vec = (0..len).collect(); + sorted_indices.sort_by(|&a, &b| { + eigenvalues[b] + .partial_cmp(&eigenvalues[a]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + // Extract top n_modes + let modes: Vec> = sorted_indices + .iter() + .take(n_modes) + .map(|&idx| eigenvectors.column(idx).to_vec()) + .collect(); + let energies: Vec = sorted_indices + .iter() + .take(n_modes) + .map(|&idx| eigenvalues[idx].max(0.0)) + .collect(); + + // Marcenko-Pastur noise estimate: median of POSITIVE + // eigenvalues in the bottom half. Excludes zeros from + // rank-deficient matrices (when p > n). + let noise_var = { + let mut positive: Vec = eigenvalues + .iter().copied().filter(|&e| e > 1e-10).collect(); + positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + if positive.len() >= 4 { + let half = positive.len() / 2; + positive[..half].iter().sum::() / half as f64 + } else if !positive.is_empty() { + positive[0] + } else { + 1e-10 + } + }; + // MP ratio: p/n where n = total observations (frames * links) + let total_obs_mp = self.covariance_count as f64 * self.config.n_links as f64; + let ratio = n_sc as f64 / total_obs_mp; + let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2); + let baseline_count = eigenvalues + .iter() + .filter(|&&ev| ev > mp_threshold) + .count(); + + (energies, modes, baseline_count) + } + Err(_) => { + // Fallback to diagonal approximation on SVD failure + diagonal_fallback(&self.link_stats, n_sc, n_modes) + } + } + // When eigenvalue feature is disabled, use diagonal fallback + #[cfg(not(feature = "eigenvalue"))] + { diagonal_fallback(&self.link_stats, n_sc, n_modes) } + } else { + diagonal_fallback(&self.link_stats, n_sc, n_modes) + } + } else { + diagonal_fallback(&self.link_stats, n_sc, n_modes) + }; + + // Compute variance explained using the same centered covariance as modes. + // total_variance = trace(centered_covariance) = sum of ALL eigenvalues. + let total_energy: f64 = mode_energies.iter().sum(); + let total_variance = if let Some(ref cov_sum) = self.covariance_sum { + if self.covariance_count > 1 { + let n_links_f = self.config.n_links as f64; + let total_obs = self.covariance_count as f64 * n_links_f; + // Centered trace: E[x^2] - E[x]^2, with Bessel correction + let mut avg_mean = vec![0.0f64; n_sc]; + for ls in &self.link_stats { + let m = ls.mean_vector(); + for i in 0..n_sc { avg_mean[i] += m[i]; } + } + for i in 0..n_sc { avg_mean[i] /= n_links_f; } + let raw_trace: f64 = (0..n_sc).map(|i| cov_sum[[i, i]] / total_obs).sum(); + let mean_sq: f64 = avg_mean.iter().map(|m| m * m).sum(); + (raw_trace - mean_sq).max(0.0) * total_obs / (total_obs - 1.0) + } else { + total_energy } - } - let n_links_f = self.config.n_links as f64; - for v in avg_variance.iter_mut() { - *v /= n_links_f; - } - - // Extract modes via simplified power iteration on the diagonal - // covariance. Since we use a diagonal approximation, the eigenmodes - // are aligned with the standard basis, sorted by variance. - let total_variance: f64 = avg_variance.iter().sum(); - - // Sort subcarrier indices by variance (descending) to pick top-K modes - let mut indices: Vec = (0..n_sc).collect(); - indices.sort_by(|&a, &b| { - avg_variance[b] - .partial_cmp(&avg_variance[a]) - .unwrap_or(std::cmp::Ordering::Equal) - }); - - let mut environmental_modes = Vec::with_capacity(n_modes); - let mut mode_energies = Vec::with_capacity(n_modes); - let mut explained = 0.0_f64; - - for k in 0..n_modes { - let idx = indices[k]; - // Create a unit vector along the highest-variance subcarrier - let mut mode = vec![0.0_f64; n_sc]; - mode[idx] = 1.0; - let energy = avg_variance[idx]; - environmental_modes.push(mode); - mode_energies.push(energy); - explained += energy; - } - + } else { + total_energy + }; let variance_explained = if total_variance > 1e-15 { - explained / total_variance + total_energy / total_variance } else { 0.0 }; @@ -459,6 +631,7 @@ impl FieldModel { variance_explained, calibrated_at_us: timestamp_us, geometry_hash, + baseline_eigenvalue_count: baseline_eig_count, }; self.modes = Some(field_mode); @@ -541,6 +714,100 @@ impl FieldModel { }) } + /// Estimate room occupancy from eigenvalue analysis of recent CSI frames. + /// + /// `recent_frames`: sliding window of amplitude vectors (recommend 50 frames + /// ~ 2.5s at 20 Hz). Returns estimated person count (0 = empty room). + /// + /// Requires the `eigenvalue` feature (BLAS). Returns `NotCalibrated` when + /// the feature is disabled. + #[cfg(feature = "eigenvalue")] + pub fn estimate_occupancy(&self, recent_frames: &[Vec]) -> Result { + let modes = self.modes.as_ref().ok_or(FieldModelError::NotCalibrated)?; + + let n = self.config.n_subcarriers; + if recent_frames.len() < 10 { + return Err(FieldModelError::InsufficientData { + need: 10, + have: recent_frames.len(), + }); + } + + // Build covariance matrix from recent frames + let mut mean = vec![0.0f64; n]; + let mut count = 0usize; + for frame in recent_frames { + if frame.len() >= n { + for i in 0..n { + mean[i] += frame[i]; + } + count += 1; + } + } + if count < 2 { + return Ok(0); + } + for m in &mut mean { + *m /= count as f64; + } + + let mut cov = Array2::::zeros((n, n)); + for frame in recent_frames { + if frame.len() >= n { + for i in 0..n { + let ci = frame[i] - mean[i]; + for j in i..n { + let val = ci * (frame[j] - mean[j]); + cov[[i, j]] += val; + if i != j { + cov[[j, i]] += val; + } + } + } + } + } + let scale = 1.0 / (count as f64 - 1.0); + cov *= scale; + + // Eigendecompose + let eigenvalues = match cov.eigh(UPLO::Upper) { + Ok((evals, _)) => evals, + Err(_) => return Ok(0), // SVD failure = can't estimate + }; + + // Marcenko-Pastur noise estimate: median of POSITIVE eigenvalues + // in the bottom half. Excludes zeros from rank-deficient matrices + // (common when n_subcarriers > n_frames, e.g. 56 subcarriers / 50 frames). + let noise_var = { + let mut positive: Vec = eigenvalues.iter() + .copied() + .filter(|&e| e > 1e-10) + .collect(); + positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + if positive.len() >= 4 { + let half = positive.len() / 2; + positive[..half].iter().sum::() / half as f64 + } else if !positive.is_empty() { + positive[0] + } else { + return Ok(0); // All zero eigenvalues — can't estimate + } + }; + let ratio = n as f64 / count as f64; + let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2); + + let significant = eigenvalues.iter().filter(|&&ev| ev > mp_threshold).count(); + let occupancy = significant.saturating_sub(modes.baseline_eigenvalue_count); + + Ok(occupancy.min(10)) // Cap at 10 persons + } + + /// Stub when eigenvalue feature is disabled — always returns NotCalibrated. + #[cfg(not(feature = "eigenvalue"))] + pub fn estimate_occupancy(&self, _recent_frames: &[Vec]) -> Result { + Err(FieldModelError::NotCalibrated) + } + /// Check calibration freshness against a given timestamp. pub fn check_freshness(&self, current_us: u64) -> CalibrationStatus { if self.modes.is_none() { @@ -563,6 +830,8 @@ impl FieldModel { .collect(); self.modes = None; self.status = CalibrationStatus::Uncalibrated; + self.covariance_sum = None; + self.covariance_count = 0; } } @@ -873,6 +1142,179 @@ mod tests { } } + #[test] + fn test_covariance_accumulation() { + let config = make_config(2, 4, 5); + let mut model = FieldModel::new(config).unwrap(); + + // Feed calibration data + for i in 0..10 { + let obs = make_observations(2, 4, 1.0 + 0.1 * i as f64); + model.feed_calibration(&obs).unwrap(); + } + + // covariance_sum should be populated + assert!(model.covariance_sum.is_some()); + assert!(model.covariance_count > 0); + let cov = model.covariance_sum.as_ref().unwrap(); + assert_eq!(cov.shape(), &[4, 4]); + // Diagonal entries should be non-negative (sum of squares) + for i in 0..4 { + assert!(cov[[i, i]] >= 0.0, "Diagonal covariance entry must be >= 0"); + } + // Matrix should be symmetric + for i in 0..4 { + for j in 0..4 { + assert!( + (cov[[i, j]] - cov[[j, i]]).abs() < 1e-10, + "Covariance matrix must be symmetric" + ); + } + } + } + + #[test] + fn test_svd_finalize_produces_orthonormal_modes() { + let config = FieldModelConfig { + n_links: 1, + n_subcarriers: 8, + n_modes: 3, + min_calibration_frames: 20, + baseline_expiry_s: 86_400.0, + }; + let mut model = FieldModel::new(config).unwrap(); + + // Feed frames with correlated subcarrier patterns to produce + // non-trivial eigenmodes + for i in 0..50 { + let t = i as f64 * 0.1; + let obs = vec![vec![ + 1.0 + t.sin(), + 2.0 + t.cos(), + 3.0 + 0.5 * t.sin(), + 4.0 + 0.3 * t.cos(), + 5.0 + 0.1 * t, + 6.0, + 7.0 + 0.2 * (2.0 * t).sin(), + 8.0 + 0.1 * (2.0 * t).cos(), + ]]; + model.feed_calibration(&obs).unwrap(); + } + model.finalize_calibration(1_000_000, 0).unwrap(); + + let modes = model.modes().unwrap(); + // Each mode should be approximately unit length + for (k, mode) in modes.environmental_modes.iter().enumerate() { + let norm: f64 = mode.iter().map(|x| x * x).sum::().sqrt(); + assert!( + (norm - 1.0).abs() < 0.01, + "Mode {} has norm {} (expected ~1.0)", + k, + norm + ); + } + // Modes should be approximately orthogonal + for i in 0..modes.environmental_modes.len() { + for j in (i + 1)..modes.environmental_modes.len() { + let dot: f64 = modes.environmental_modes[i] + .iter() + .zip(modes.environmental_modes[j].iter()) + .map(|(a, b)| a * b) + .sum(); + assert!( + dot.abs() < 0.05, + "Modes {} and {} have dot product {} (expected ~0)", + i, + j, + dot + ); + } + } + } + + #[test] + fn test_estimate_occupancy_noise_only() { + let config = FieldModelConfig { + n_links: 1, + n_subcarriers: 8, + n_modes: 3, + min_calibration_frames: 20, + baseline_expiry_s: 86_400.0, + }; + let mut model = FieldModel::new(config).unwrap(); + + // Calibrate with some deterministic noise-like pattern + for i in 0..50 { + let t = i as f64 * 0.1; + let obs = vec![vec![ + 1.0 + 0.01 * t.sin(), + 2.0 + 0.01 * t.cos(), + 3.0 + 0.01 * (2.0 * t).sin(), + 4.0 + 0.01 * (2.0 * t).cos(), + 5.0 + 0.01 * (3.0 * t).sin(), + 6.0 + 0.01 * (3.0 * t).cos(), + 7.0 + 0.01 * (4.0 * t).sin(), + 8.0 + 0.01 * (4.0 * t).cos(), + ]]; + model.feed_calibration(&obs).unwrap(); + } + model.finalize_calibration(1_000_000, 0).unwrap(); + + // Estimate occupancy with similar noise-only frames + let frames: Vec> = (0..20) + .map(|i| { + let t = (i + 50) as f64 * 0.1; + vec![ + 1.0 + 0.01 * t.sin(), + 2.0 + 0.01 * t.cos(), + 3.0 + 0.01 * (2.0 * t).sin(), + 4.0 + 0.01 * (2.0 * t).cos(), + 5.0 + 0.01 * (3.0 * t).sin(), + 6.0 + 0.01 * (3.0 * t).cos(), + 7.0 + 0.01 * (4.0 * t).sin(), + 8.0 + 0.01 * (4.0 * t).cos(), + ] + }) + .collect(); + let occupancy = model.estimate_occupancy(&frames).unwrap(); + assert_eq!(occupancy, 0, "Noise-only frames should yield 0 occupancy"); + } + + #[test] + fn test_baseline_eigenvalue_count_stored() { + let config = FieldModelConfig { + n_links: 1, + n_subcarriers: 8, + n_modes: 3, + min_calibration_frames: 20, + baseline_expiry_s: 86_400.0, + }; + let mut model = FieldModel::new(config).unwrap(); + + // Feed frames with structured variance so eigenvalues are meaningful + for i in 0..50 { + let t = i as f64 * 0.1; + let obs = vec![vec![ + 1.0 + t.sin(), + 2.0 + t.cos(), + 3.0 + 0.5 * t.sin(), + 4.0 + 0.3 * t.cos(), + 5.0 + 0.1 * t, + 6.0, + 7.0, + 8.0, + ]]; + model.feed_calibration(&obs).unwrap(); + } + let modes = model.finalize_calibration(1_000_000, 0).unwrap(); + // baseline_eigenvalue_count should exist and be a reasonable value + // (at least 0, at most n_subcarriers) + assert!( + modes.baseline_eigenvalue_count <= 8, + "baseline_eigenvalue_count should be <= n_subcarriers" + ); + } + #[test] fn test_environmental_projection_removes_drift() { let config = make_config(1, 4, 10); diff --git a/ui/components/SensingTab.js b/ui/components/SensingTab.js index 6c3115c12..33387eefe 100644 --- a/ui/components/SensingTab.js +++ b/ui/components/SensingTab.js @@ -110,12 +110,18 @@ export class SensingTab {
About This Data

Metrics are computed from WiFi Channel State Information (CSI). - With 1 ESP32 you get presence detection, breathing + With 0 ESP32 node(s) you get presence detection, breathing estimation, and gross motion. Add 3-4+ ESP32 nodes around the room for spatial resolution and limb-level tracking.

+ +
+
NODE STATUS
+
+
+
Details
@@ -193,6 +199,9 @@ export class SensingTab { // Update HUD this._updateHUD(data); + + // Update per-node panels + this._updateNodePanels(data); } _onStateChange(state) { @@ -233,6 +242,11 @@ export class SensingTab { const f = data.features || {}; const c = data.classification || {}; + // Node count + const nodeCount = (data.nodes || []).length; + const countEl = this.container.querySelector('#sensingNodeCount'); + if (countEl) countEl.textContent = String(nodeCount); + // RSSI this._setText('sensingRssi', `${(f.mean_rssi || -80).toFixed(1)} dBm`); this._setText('sensingSource', data.source || ''); @@ -309,6 +323,57 @@ export class SensingTab { ctx.stroke(); } + // ---- Per-node panels --------------------------------------------------- + + _updateNodePanels(data) { + const container = this.container.querySelector('#nodeStatusContainer'); + if (!container) return; + const nodeFeatures = data.node_features || []; + if (nodeFeatures.length === 0) { + container.textContent = ''; + const msg = document.createElement('div'); + msg.style.cssText = 'color:#888;font-size:12px;padding:8px;'; + msg.textContent = 'No nodes detected'; + container.appendChild(msg); + return; + } + const NODE_COLORS = ['#00ccff', '#ff6600', '#00ff88', '#ff00cc', '#ffcc00', '#8800ff', '#00ffcc', '#ff0044']; + container.textContent = ''; + for (const nf of nodeFeatures) { + const color = NODE_COLORS[nf.node_id % NODE_COLORS.length]; + const statusColor = nf.stale ? '#888' : '#0f0'; + + const row = document.createElement('div'); + row.style.cssText = `display:flex;align-items:center;gap:8px;padding:6px 8px;margin-bottom:4px;background:rgba(255,255,255,0.03);border-radius:6px;border-left:3px solid ${color};`; + + const idCol = document.createElement('div'); + idCol.style.minWidth = '50px'; + const nameEl = document.createElement('div'); + nameEl.style.cssText = `font-size:11px;font-weight:600;color:${color};`; + nameEl.textContent = 'Node ' + nf.node_id; + const statusEl = document.createElement('div'); + statusEl.style.cssText = `font-size:9px;color:${statusColor};`; + statusEl.textContent = nf.stale ? 'STALE' : 'ACTIVE'; + idCol.appendChild(nameEl); + idCol.appendChild(statusEl); + + const metricsCol = document.createElement('div'); + metricsCol.style.cssText = 'flex:1;font-size:10px;color:#aaa;'; + metricsCol.textContent = (nf.rssi_dbm || -80).toFixed(0) + ' dBm · var ' + (nf.features?.variance || 0).toFixed(1); + + const classCol = document.createElement('div'); + classCol.style.cssText = 'font-size:10px;font-weight:600;color:#ccc;'; + const motion = (nf.classification?.motion_level || 'absent').toUpperCase(); + const conf = ((nf.classification?.confidence || 0) * 100).toFixed(0); + classCol.textContent = motion + ' ' + conf + '%'; + + row.appendChild(idCol); + row.appendChild(metricsCol); + row.appendChild(classCol); + container.appendChild(row); + } + } + // ---- Resize ------------------------------------------------------------ _setupResize() { diff --git a/ui/components/gaussian-splats.js b/ui/components/gaussian-splats.js index ecab6e481..5f7227fa3 100644 --- a/ui/components/gaussian-splats.js +++ b/ui/components/gaussian-splats.js @@ -66,6 +66,10 @@ function valueToColor(v) { return [r, g, b]; } +// ---- Node marker color palette ------------------------------------------- + +const NODE_MARKER_COLORS = [0x00ccff, 0xff6600, 0x00ff88, 0xff00cc, 0xffcc00, 0x8800ff, 0x00ffcc, 0xff0044]; + // ---- GaussianSplatRenderer ----------------------------------------------- export class GaussianSplatRenderer { @@ -108,6 +112,10 @@ export class GaussianSplatRenderer { // Node markers (ESP32 / router positions) this._createNodeMarkers(THREE); + // Dynamic per-node markers (multi-node support) + this.nodeMarkers = new Map(); // nodeId -> THREE.Mesh + this._THREE = THREE; + // Body disruption blob this._createBodyBlob(THREE); @@ -369,11 +377,43 @@ export class GaussianSplatRenderer { bGeo.attributes.splatSize.needsUpdate = true; } - // -- Update node positions --------------------------------------------- + // -- Update node positions (legacy single-node) ------------------------ if (nodes.length > 0 && nodes[0].position) { const pos = nodes[0].position; this.nodeMarker.position.set(pos[0], 0.5, pos[2]); } + + // -- Update dynamic per-node markers (multi-node support) -------------- + if (nodes && nodes.length > 0 && this.scene) { + const THREE = this._THREE || window.THREE; + if (THREE) { + const activeIds = new Set(); + for (const node of nodes) { + activeIds.add(node.node_id); + if (!this.nodeMarkers.has(node.node_id)) { + const geo = new THREE.SphereGeometry(0.25, 16, 16); + const mat = new THREE.MeshBasicMaterial({ + color: NODE_MARKER_COLORS[node.node_id % NODE_MARKER_COLORS.length], + transparent: true, + opacity: 0.8, + }); + const marker = new THREE.Mesh(geo, mat); + this.scene.add(marker); + this.nodeMarkers.set(node.node_id, marker); + } + const marker = this.nodeMarkers.get(node.node_id); + const pos = node.position || [0, 0, 0]; + marker.position.set(pos[0], 0.5, pos[2]); + } + // Remove stale markers + for (const [id, marker] of this.nodeMarkers) { + if (!activeIds.has(id)) { + this.scene.remove(marker); + this.nodeMarkers.delete(id); + } + } + } + } } // ---- Render loop ------------------------------------------------------- diff --git a/ui/services/sensing.service.js b/ui/services/sensing.service.js index 4931e86e2..0992483bc 100644 --- a/ui/services/sensing.service.js +++ b/ui/services/sensing.service.js @@ -84,6 +84,11 @@ class SensingService { return [...this._rssiHistory]; } + /** Get per-node RSSI history (object keyed by node_id). */ + getPerNodeRssiHistory() { + return { ...(this._perNodeRssiHistory || {}) }; + } + /** Current connection state. */ get state() { return this._state; @@ -327,6 +332,20 @@ class SensingService { } } + // Per-node RSSI tracking + if (!this._perNodeRssiHistory) this._perNodeRssiHistory = {}; + if (data.node_features) { + for (const nf of data.node_features) { + if (!this._perNodeRssiHistory[nf.node_id]) { + this._perNodeRssiHistory[nf.node_id] = []; + } + this._perNodeRssiHistory[nf.node_id].push(nf.rssi_dbm); + if (this._perNodeRssiHistory[nf.node_id].length > this._maxHistory) { + this._perNodeRssiHistory[nf.node_id].shift(); + } + } + } + // Notify all listeners for (const cb of this._listeners) { try {