From 0e786279e52e064c392e9c77e69710cccf77895c Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Sat, 14 Mar 2026 01:01:30 -0700 Subject: [PATCH 01/12] feat: deag, click on enable --- dimos/src/interaction/keyboard.rs | 88 ++++++++++++++++++------ dimos/src/viewer.rs | 107 +----------------------------- 2 files changed, 72 insertions(+), 123 deletions(-) diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index b6cdcd809c37..9e7d2b79776a 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -19,7 +19,6 @@ const BASE_ANGULAR_SPEED: f64 = 0.8; // rad/s const FAST_MULTIPLIER: f64 = 2.0; // Shift modifier /// Overlay styling -const OVERLAY_MARGIN: f32 = 12.0; const OVERLAY_PADDING: f32 = 10.0; const OVERLAY_ROUNDING: f32 = 8.0; const OVERLAY_BG: egui::Color32 = egui::Color32::from_rgba_premultiplied(20, 20, 30, 220); @@ -66,11 +65,13 @@ impl KeyState { } /// Handles keyboard input and publishes Twist via LCM. +/// Must be activated by clicking the overlay before keys are captured. pub struct KeyboardHandler { publisher: LcmPublisher, state: KeyState, was_active: bool, estop_flash: bool, // true briefly after space pressed + engaged: bool, // true when user has clicked the overlay to activate } impl KeyboardHandler { @@ -82,29 +83,30 @@ impl KeyboardHandler { state: KeyState::new(), was_active: false, estop_flash: false, + engaged: false, }) } /// Process keyboard input from egui and publish Twist if keys are held. /// Called once per frame from DimosApp.ui(). + /// Only captures keys when the overlay has been clicked (engaged). /// /// Returns true if any movement key is active (for UI overlay). pub fn process(&mut self, ctx: &egui::Context) -> bool { self.estop_flash = false; - // Check if any text widget has focus - if so, skip keyboard capture - let text_has_focus = ctx.memory(|m| m.focused().is_some()); - if text_has_focus { + // If not engaged, don't capture any keys + if !self.engaged { if self.was_active { if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop command on focus change: {e:?}"); + re_log::warn!("Failed to send stop on disengage: {e:?}"); } self.was_active = false; } return false; } - // Update key state from egui input + // Update key state from egui input (engaged flag is the only gate) self.update_key_state(ctx); // Check for emergency stop (Space key pressed - one-shot action) @@ -134,33 +136,71 @@ impl KeyboardHandler { self.state.any_active() } - /// Draw keyboard overlay HUD. Always shown (dim when idle, bright when active). - pub fn draw_overlay(&self, ctx: &egui::Context) { + /// Draw keyboard overlay HUD at bottom-right of the 3D viewport area. + /// Clickable: clicking the overlay toggles engaged state. + pub fn draw_overlay(&mut self, ctx: &egui::Context) { + let screen_rect = ctx.content_rect(); + // Default position: bottom-right of the 3D viewport area + let overlay_width = 140.0; + let overlay_height = 160.0; + let right_panel_offset = 320.0; + let bottom_timeline_offset = 120.0; + let default_pos = egui::pos2( + screen_rect.max.x - overlay_width - right_panel_offset, + screen_rect.max.y - overlay_height - bottom_timeline_offset, + ); + egui::Area::new("keyboard_hud".into()) - .fixed_pos(egui::pos2(OVERLAY_MARGIN, OVERLAY_MARGIN)) + .default_pos(default_pos) + .movable(true) .order(egui::Order::Foreground) - .interactable(false) + .interactable(true) .show(ctx, |ui| { - egui::Frame::new() + let border_color = if self.engaged { + egui::Color32::from_rgb(60, 180, 75) // green border when active + } else { + egui::Color32::from_rgb(80, 80, 100) // dim border when inactive + }; + + let response = egui::Frame::new() .fill(OVERLAY_BG) .corner_radius(egui::CornerRadius::same(OVERLAY_ROUNDING as u8)) .inner_margin(egui::Margin::same(OVERLAY_PADDING as i8)) + .stroke(egui::Stroke::new(2.0, border_color)) .show(ui, |ui| { self.draw_hud_content(ui); }); + + // Make the frame rect clickable (Frame doesn't have click sense by default) + let click_response = ui.interact( + response.response.rect, + ui.id().with("wasd_click"), + egui::Sense::click(), + ).on_hover_cursor(egui::CursorIcon::Default); + + // Toggle engaged state on click + if click_response.clicked() { + self.engaged = !self.engaged; + if !self.engaged { + // Send stop when disengaging + if let Err(e) = self.publish_stop() { + re_log::warn!("Failed to send stop on disengage: {e:?}"); + } + self.state.reset(); + self.was_active = false; + } + re_log::info!( + "Keyboard teleop {}", + if self.engaged { "ENGAGED" } else { "DISENGAGED" } + ); + } }); + } fn draw_hud_content(&self, ui: &mut egui::Ui) { - let active = self.state.any_active() || self.estop_flash; - // Title - let title_color = if active { - egui::Color32::WHITE - } else { - egui::Color32::from_rgb(120, 120, 140) - }; - ui.label(egui::RichText::new("🎮 Keyboard Teleop").color(title_color).size(13.0)); + ui.label(egui::RichText::new("Keyboard Teleop").color(LABEL_COLOR).size(13.0)); ui.add_space(4.0); // Key grid: [Q] [W] [E] @@ -352,6 +392,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); @@ -368,6 +409,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); @@ -381,6 +423,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); @@ -397,6 +440,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); @@ -410,6 +454,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); @@ -427,6 +472,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED * FAST_MULTIPLIER); @@ -444,6 +490,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); @@ -471,6 +518,7 @@ mod tests { assert!(handler.is_ok()); let handler = handler.unwrap(); assert!(!handler.was_active); + assert!(!handler.engaged); assert!(!handler.state.any_active()); } @@ -484,6 +532,7 @@ mod tests { state, was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); @@ -498,6 +547,7 @@ mod tests { state: KeyState::new(), was_active: false, estop_flash: false, + engaged: true, }; let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index 7af7282ef188..86a7532458b1 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -1,25 +1,15 @@ -use std::cell::RefCell; -use std::rc::Rc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; - use clap::Parser; -use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, click_event_from_ms}; +use dimos_viewer::interaction::KeyboardHandler; use rerun::external::{eframe, egui, re_crash_handler, re_grpc_server, re_log, re_memory, re_viewer}; #[global_allocator] static GLOBAL: re_memory::AccountingAllocator = re_memory::AccountingAllocator::new(mimalloc::MiMalloc); -/// LCM channel for click events (follows RViz convention) -const LCM_CHANNEL: &str = "/clicked_point#geometry_msgs.PointStamped"; -/// Minimum time between click events (debouncing) -const CLICK_DEBOUNCE_MS: u64 = 100; -/// Maximum rapid clicks to log as warning -const RAPID_CLICK_THRESHOLD: usize = 5; /// Default gRPC listen port (9877 to avoid conflict with stock Rerun on 9876) const DEFAULT_PORT: u16 = 9877; -/// DimOS Interactive Viewer — a custom Rerun viewer with LCM click-to-navigate. +/// DimOS Interactive Viewer — a custom Rerun viewer with WASD keyboard teleop. /// /// Accepts the same CLI flags as the stock `rerun` binary so it can be spawned /// seamlessly via `rerun_bindings.spawn(executable_name="dimos-viewer")`. @@ -73,7 +63,7 @@ impl eframe::App for DimosApp { // Process keyboard input before delegating to Rerun self.keyboard.process(ui.ctx()); - // Always draw the keyboard HUD overlay (dims when inactive) + // Draw the keyboard HUD overlay (click to engage/disengage) self.keyboard.draw_overlay(ui.ctx()); // Delegate to Rerun's main ui method @@ -124,20 +114,11 @@ async fn main() -> Result<(), Box> { re_grpc_server::shutdown::never(), ); - // Create LCM publisher for click events - let lcm_publisher = LcmPublisher::new(LCM_CHANNEL.to_string()) - .expect("Failed to create LCM publisher"); - re_log::info!("LCM publisher created for channel: {LCM_CHANNEL}"); - // Create keyboard handler let keyboard_handler = KeyboardHandler::new() .expect("Failed to create keyboard handler"); re_log::info!("Keyboard handler initialized for WASD controls on /cmd_vel"); - // State for debouncing and rapid click detection - let last_click_time = Rc::new(RefCell::new(Instant::now())); - let rapid_click_count = Rc::new(RefCell::new(0usize)); - let mut native_options = re_viewer::native::eframe_options(None); native_options.viewport = native_options .viewport @@ -151,88 +132,6 @@ async fn main() -> Result<(), Box> { let startup_options = re_viewer::StartupOptions { memory_limit, - on_event: Some(Rc::new({ - let last_click_time = last_click_time.clone(); - let rapid_click_count = rapid_click_count.clone(); - - move |event: re_viewer::ViewerEvent| { - if let re_viewer::ViewerEventKind::SelectionChange { items } = event.kind { - let mut has_position = false; - let mut no_position_count = 0; - - for item in items { - match item { - re_viewer::SelectionChangeItem::Entity { - entity_path, - view_name: _, - position: Some(pos), - .. - } => { - has_position = true; - - // Debouncing - let now = Instant::now(); - let elapsed = now.duration_since(*last_click_time.borrow()); - - if elapsed < Duration::from_millis(CLICK_DEBOUNCE_MS) { - let mut count = rapid_click_count.borrow_mut(); - *count += 1; - if *count == RAPID_CLICK_THRESHOLD { - re_log::warn!( - "Rapid click detected ({} clicks within {}ms)", - RAPID_CLICK_THRESHOLD, - CLICK_DEBOUNCE_MS - ); - } - continue; - } else { - *rapid_click_count.borrow_mut() = 0; - } - *last_click_time.borrow_mut() = now; - - let timestamp_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - - // Build click event and publish via LCM - let click = click_event_from_ms( - [pos.x, pos.y, pos.z], - &entity_path.to_string(), - timestamp_ms, - ); - - match lcm_publisher.publish(&click) { - Ok(_) => { - re_log::debug!( - "LCM click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", - entity_path, - pos.x, - pos.y, - pos.z - ); - } - Err(err) => { - re_log::error!("Failed to publish LCM click event: {err:?}"); - } - } - } - re_viewer::SelectionChangeItem::Entity { position: None, .. } => { - no_position_count += 1; - } - _ => {} - } - } - - if !has_position && no_position_count > 0 { - re_log::trace!( - "Selection change without position data ({no_position_count} items). \ - This is normal for hover/keyboard navigation." - ); - } - } - } - })), ..Default::default() }; From 117df644935d9972a82aa44b716f480a8c9128fa Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Sat, 14 Mar 2026 01:31:29 -0700 Subject: [PATCH 02/12] feat: click anywhere else to disengage --- dimos/src/interaction/keyboard.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 9e7d2b79776a..88a556c9c042 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -150,7 +150,7 @@ impl KeyboardHandler { screen_rect.max.y - overlay_height - bottom_timeline_offset, ); - egui::Area::new("keyboard_hud".into()) + let area_response = egui::Area::new("keyboard_hud".into()) .default_pos(default_pos) .movable(true) .order(egui::Order::Foreground) @@ -176,7 +176,12 @@ impl KeyboardHandler { response.response.rect, ui.id().with("wasd_click"), egui::Sense::click(), - ).on_hover_cursor(egui::CursorIcon::Default); + ); + + // Force arrow cursor over the entire overlay (overrides label I-beam) + if click_response.hovered() { + ctx.set_cursor_icon(egui::CursorIcon::Default); + } // Toggle engaged state on click if click_response.clicked() { @@ -194,8 +199,22 @@ impl KeyboardHandler { if self.engaged { "ENGAGED" } else { "DISENGAGED" } ); } - }); - + }) + .response; + + // Disengage when clicking anywhere outside the overlay + if self.engaged + && !ctx.rect_contains_pointer(area_response.layer_id, area_response.interact_rect) + && ctx.input(|i| i.pointer.primary_clicked()) + { + self.engaged = false; + if let Err(e) = self.publish_stop() { + re_log::warn!("Failed to send stop on outside click: {e:?}"); + } + self.state.reset(); + self.was_active = false; + re_log::info!("Keyboard teleop DISENGAGED (clicked outside)"); + } } fn draw_hud_content(&self, ui: &mut egui::Ui) { From d57c63de81056958acf45382ef8d2e2eab6a9c52 Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Sat, 14 Mar 2026 01:44:21 -0700 Subject: [PATCH 03/12] feat: click to navigate --- dimos/src/viewer.rs | 105 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index 86a7532458b1..f2c5f98ca31c 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -1,15 +1,25 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + use clap::Parser; -use dimos_viewer::interaction::KeyboardHandler; +use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, click_event_from_ms}; use rerun::external::{eframe, egui, re_crash_handler, re_grpc_server, re_log, re_memory, re_viewer}; #[global_allocator] static GLOBAL: re_memory::AccountingAllocator = re_memory::AccountingAllocator::new(mimalloc::MiMalloc); +/// LCM channel for click events (follows RViz convention) +const LCM_CHANNEL: &str = "/clicked_point#geometry_msgs.PointStamped"; +/// Minimum time between click events (debouncing) +const CLICK_DEBOUNCE_MS: u64 = 100; +/// Maximum rapid clicks to log as warning +const RAPID_CLICK_THRESHOLD: usize = 5; /// Default gRPC listen port (9877 to avoid conflict with stock Rerun on 9876) const DEFAULT_PORT: u16 = 9877; -/// DimOS Interactive Viewer — a custom Rerun viewer with WASD keyboard teleop. +/// DimOS Interactive Viewer — a custom Rerun viewer with LCM click-to-navigate. /// /// Accepts the same CLI flags as the stock `rerun` binary so it can be spawned /// seamlessly via `rerun_bindings.spawn(executable_name="dimos-viewer")`. @@ -114,11 +124,20 @@ async fn main() -> Result<(), Box> { re_grpc_server::shutdown::never(), ); + // Create LCM publisher for click events + let lcm_publisher = LcmPublisher::new(LCM_CHANNEL.to_string()) + .expect("Failed to create LCM publisher"); + re_log::info!("LCM publisher created for channel: {LCM_CHANNEL}"); + // Create keyboard handler let keyboard_handler = KeyboardHandler::new() .expect("Failed to create keyboard handler"); re_log::info!("Keyboard handler initialized for WASD controls on /cmd_vel"); + // State for debouncing and rapid click detection + let last_click_time = Rc::new(RefCell::new(Instant::now())); + let rapid_click_count = Rc::new(RefCell::new(0usize)); + let mut native_options = re_viewer::native::eframe_options(None); native_options.viewport = native_options .viewport @@ -132,6 +151,88 @@ async fn main() -> Result<(), Box> { let startup_options = re_viewer::StartupOptions { memory_limit, + on_event: Some(Rc::new({ + let last_click_time = last_click_time.clone(); + let rapid_click_count = rapid_click_count.clone(); + + move |event: re_viewer::ViewerEvent| { + if let re_viewer::ViewerEventKind::SelectionChange { items } = event.kind { + let mut has_position = false; + let mut no_position_count = 0; + + for item in items { + match item { + re_viewer::SelectionChangeItem::Entity { + entity_path, + view_name: _, + position: Some(pos), + .. + } => { + has_position = true; + + // Debouncing + let now = Instant::now(); + let elapsed = now.duration_since(*last_click_time.borrow()); + + if elapsed < Duration::from_millis(CLICK_DEBOUNCE_MS) { + let mut count = rapid_click_count.borrow_mut(); + *count += 1; + if *count == RAPID_CLICK_THRESHOLD { + re_log::warn!( + "Rapid click detected ({} clicks within {}ms)", + RAPID_CLICK_THRESHOLD, + CLICK_DEBOUNCE_MS + ); + } + continue; + } else { + *rapid_click_count.borrow_mut() = 0; + } + *last_click_time.borrow_mut() = now; + + let timestamp_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + // Build click event and publish via LCM + let click = click_event_from_ms( + [pos.x, pos.y, pos.z], + &entity_path.to_string(), + timestamp_ms, + ); + + match lcm_publisher.publish(&click) { + Ok(_) => { + re_log::debug!( + "LCM click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, + pos.x, + pos.y, + pos.z + ); + } + Err(err) => { + re_log::error!("Failed to publish LCM click event: {err:?}"); + } + } + } + re_viewer::SelectionChangeItem::Entity { position: None, .. } => { + no_position_count += 1; + } + _ => {} + } + } + + if !has_position && no_position_count > 0 { + re_log::trace!( + "Selection change without position data ({no_position_count} items). \ + This is normal for hover/keyboard navigation." + ); + } + } + } + })), ..Default::default() }; From c1baf07b6777668430c6b05b37e8d9e215e3ec0d Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Sat, 14 Mar 2026 12:51:26 -0700 Subject: [PATCH 04/12] fix: remove logging --- dimos/src/interaction/keyboard.rs | 5 ----- dimos/src/viewer.rs | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 88a556c9c042..b3f631b711ec 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -194,10 +194,6 @@ impl KeyboardHandler { self.state.reset(); self.was_active = false; } - re_log::info!( - "Keyboard teleop {}", - if self.engaged { "ENGAGED" } else { "DISENGAGED" } - ); } }) .response; @@ -213,7 +209,6 @@ impl KeyboardHandler { } self.state.reset(); self.was_active = false; - re_log::info!("Keyboard teleop DISENGAGED (clicked outside)"); } } diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index f2c5f98ca31c..0b71b71f8484 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -132,7 +132,7 @@ async fn main() -> Result<(), Box> { // Create keyboard handler let keyboard_handler = KeyboardHandler::new() .expect("Failed to create keyboard handler"); - re_log::info!("Keyboard handler initialized for WASD controls on /cmd_vel"); + // State for debouncing and rapid click detection let last_click_time = Rc::new(RefCell::new(Instant::now())); From a952495af4abfab8cf3f8312d92fe6a7b10cbb85 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Sat, 14 Mar 2026 12:57:40 -0700 Subject: [PATCH 05/12] fix: reposition default --- dimos/src/interaction/keyboard.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index b3f631b711ec..67afe4e39c31 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -140,13 +140,12 @@ impl KeyboardHandler { /// Clickable: clicking the overlay toggles engaged state. pub fn draw_overlay(&mut self, ctx: &egui::Context) { let screen_rect = ctx.content_rect(); - // Default position: bottom-right of the 3D viewport area - let overlay_width = 140.0; + // Default position: bottom-left, just above the timeline bar let overlay_height = 160.0; - let right_panel_offset = 320.0; + let left_margin = 12.0; let bottom_timeline_offset = 120.0; let default_pos = egui::pos2( - screen_rect.max.x - overlay_width - right_panel_offset, + screen_rect.min.x + left_margin, screen_rect.max.y - overlay_height - bottom_timeline_offset, ); From d22140933a37967573cfb8275e6a21ec397bc4b8 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Sat, 14 Mar 2026 13:16:51 -0700 Subject: [PATCH 06/12] fix: default position --- dimos/src/interaction/keyboard.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 67afe4e39c31..1e7d51f3c71f 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -150,6 +150,7 @@ impl KeyboardHandler { ); let area_response = egui::Area::new("keyboard_hud".into()) + .pivot(egui::Align2::LEFT_BOTTOM) .default_pos(default_pos) .movable(true) .order(egui::Order::Foreground) From 88eda7d331c89e29b5195d3ee9e799daba500a98 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Sat, 14 Mar 2026 15:03:05 -0700 Subject: [PATCH 07/12] feat: add --connect arg to dimos-viewer for connecting to existing gRPC proxy When --connect is provided (with optional URL), the viewer connects to an existing gRPC proxy server instead of spawning its own. This mirrors the stock rerun binary's --connect behavior. Defaults to rerun+http://127.0.0.1:/proxy. --- dimos/src/viewer.rs | 46 +++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index 0b71b71f8484..c4f4606e98d3 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -4,7 +4,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use clap::Parser; use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, click_event_from_ms}; -use rerun::external::{eframe, egui, re_crash_handler, re_grpc_server, re_log, re_memory, re_viewer}; +use rerun::external::{eframe, egui, re_crash_handler, re_grpc_client, re_grpc_server, re_log, re_memory, re_uri, re_viewer}; #[global_allocator] static GLOBAL: re_memory::AccountingAllocator = @@ -48,6 +48,16 @@ struct Args { /// Hint that data will arrive shortly (suppresses "waiting for data" message). #[arg(long)] expect_data_soon: bool, + + /// Do not start a local gRPC server; instead connect to an existing one. + /// + /// Optionally accepts a URL to a gRPC proxy server. + /// The scheme must be one of `rerun://`, `rerun+http://`, or `rerun+https://`, + /// and the pathname must be `/proxy`. + /// + /// Defaults to `rerun+http://127.0.0.1:/proxy`. + #[arg(long)] + connect: Option>, } /// Wraps re_viewer::App to add keyboard control interception. @@ -110,19 +120,27 @@ async fn main() -> Result<(), Box> { re_log::setup_logging(); re_crash_handler::install_crash_handlers(re_viewer::build_info()); - // Listen for gRPC connections from Rerun's logging SDKs. - let listen_addr = format!("0.0.0.0:{}", args.port); - re_log::info!("Listening for SDK connections on {listen_addr}"); - let server_memory_limit = re_memory::MemoryLimit::parse(&args.server_memory_limit) - .expect("Bad --server-memory-limit"); - let rx_log = re_grpc_server::spawn_with_recv( - listen_addr.parse()?, - re_grpc_server::ServerOptions { - memory_limit: server_memory_limit, - ..Default::default() - }, - re_grpc_server::shutdown::never(), - ); + // Either connect to an existing gRPC proxy, or spawn a local server. + let rx_log = if let Some(url) = args.connect.clone() { + let url = url.unwrap_or_else(|| format!("rerun+http://127.0.0.1:{}/proxy", args.port)); + let proxy_uri: re_uri::ProxyUri = url.parse() + .expect("Bad --connect URL: expected format rerun+http://host:port/proxy"); + re_log::info!("Connecting to existing gRPC proxy at {proxy_uri}"); + re_grpc_client::stream(proxy_uri) + } else { + let listen_addr = format!("0.0.0.0:{}", args.port); + re_log::info!("Listening for SDK connections on {listen_addr}"); + let server_memory_limit = re_memory::MemoryLimit::parse(&args.server_memory_limit) + .expect("Bad --server-memory-limit"); + re_grpc_server::spawn_with_recv( + listen_addr.parse()?, + re_grpc_server::ServerOptions { + memory_limit: server_memory_limit, + ..Default::default() + }, + re_grpc_server::shutdown::never(), + ) + }; // Create LCM publisher for click events let lcm_publisher = LcmPublisher::new(LCM_CHANNEL.to_string()) From 692954b0daf87b4efa018f211f3bdbda25ee8853 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Sun, 15 Mar 2026 11:33:16 -0700 Subject: [PATCH 08/12] fix: resolve CI lint, formatting, and cargo-shear failures - Remove unused `parking_lot` dependency (cargo-shear) - Add `[lints] workspace = true` to dimos/Cargo.toml (rerun lint) - Fix TOML formatting in dimos/pyproject.toml (taplo) - Rename error variables from `e` to `err` (rerun lint convention) - Fix trailing whitespace in keyboard.rs - Add blank lines before doc comments on struct fields/constants - Use Display instead of Debug for error formatting in viewer.rs - Commit updated Cargo.lock Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 3 +-- dimos/Cargo.toml | 4 +++- dimos/pyproject.toml | 22 +++++++++++----------- dimos/src/interaction/handle.rs | 4 ++-- dimos/src/interaction/keyboard.rs | 26 +++++++++++++------------- dimos/src/interaction/lcm.rs | 3 +++ dimos/src/viewer.rs | 5 ++++- 7 files changed, 37 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0748530c4368..b3be7f215f21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3110,12 +3110,11 @@ dependencies = [ [[package]] name = "dimos-viewer" -version = "0.30.0-alpha.1+dev" +version = "0.30.0-alpha.4" dependencies = [ "bincode", "clap", "mimalloc", - "parking_lot", "rerun", "serde", "tokio", diff --git a/dimos/Cargo.toml b/dimos/Cargo.toml index dde4b393b475..3dfc9283fcca 100644 --- a/dimos/Cargo.toml +++ b/dimos/Cargo.toml @@ -29,7 +29,6 @@ rerun = { path = "../crates/top/rerun", default-features = false, features = [ clap = { workspace = true, features = ["derive"] } bincode.workspace = true mimalloc.workspace = true -parking_lot.workspace = true serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = [ "io-util", @@ -40,3 +39,6 @@ tokio = { workspace = true, features = [ "sync", "time", ] } + +[lints] +workspace = true diff --git a/dimos/pyproject.toml b/dimos/pyproject.toml index 83159e2bec27..1b1376b75811 100644 --- a/dimos/pyproject.toml +++ b/dimos/pyproject.toml @@ -8,18 +8,18 @@ version = "0.30.0a4" description = "Interactive Rerun viewer for DimOS with click-to-navigate support" readme = "README.md" requires-python = ">=3.10" -license = {text = "MIT OR Apache-2.0"} -authors = [{name = "Dimensional Inc.", email = "engineering@dimensional.com"}] +license = { text = "MIT OR Apache-2.0" } +authors = [{ name = "Dimensional Inc.", email = "engineering@dimensional.com" }] classifiers = [ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "Topic :: Scientific/Engineering", - "Topic :: Scientific/Engineering :: Visualization", - "Programming Language :: Rust", - "License :: OSI Approved :: MIT License", - "License :: OSI Approved :: Apache Software License", - "Operating System :: POSIX :: Linux", - "Operating System :: MacOS", + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Visualization", + "Programming Language :: Rust", + "License :: OSI Approved :: MIT License", + "License :: OSI Approved :: Apache Software License", + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS", ] [project.urls] diff --git a/dimos/src/interaction/handle.rs b/dimos/src/interaction/handle.rs index 0f71a6f11fd6..2378724bcd4c 100644 --- a/dimos/src/interaction/handle.rs +++ b/dimos/src/interaction/handle.rs @@ -34,8 +34,8 @@ impl InteractionHandle { is_2d, }; - if let Err(e) = self.tx.send(event) { - eprintln!("Failed to send click event: {}", e); + if let Err(err) = self.tx.send(event) { + eprintln!("Failed to send click event: {}", err); } } } diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 1e7d51f3c71f..26184566765d 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -1,5 +1,5 @@ //! Keyboard handler for WASD movement controls that publish Twist messages. -//! +//! //! Converts keyboard input to robot velocity commands following teleop conventions: //! - WASD/arrows for linear/angular motion //! - QE for strafing @@ -98,8 +98,8 @@ impl KeyboardHandler { // If not engaged, don't capture any keys if !self.engaged { if self.was_active { - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on disengage: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send stop on disengage: {err:?}"); } self.was_active = false; } @@ -112,8 +112,8 @@ impl KeyboardHandler { // Check for emergency stop (Space key pressed - one-shot action) if ctx.input(|i| i.key_pressed(egui::Key::Space)) { self.state.reset(); - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send emergency stop: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send emergency stop: {err:?}"); } self.was_active = false; self.estop_flash = true; @@ -122,13 +122,13 @@ impl KeyboardHandler { // Publish twist command if keys are active, or stop if just released if self.state.any_active() { - if let Err(e) = self.publish_twist() { - re_log::warn!("Failed to publish twist command: {e:?}"); + if let Err(err) = self.publish_twist() { + re_log::warn!("Failed to publish twist command: {err:?}"); } self.was_active = true; } else if self.was_active { - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on key release: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send stop on key release: {err:?}"); } self.was_active = false; } @@ -188,8 +188,8 @@ impl KeyboardHandler { self.engaged = !self.engaged; if !self.engaged { // Send stop when disengaging - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on disengage: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send stop on disengage: {err:?}"); } self.state.reset(); self.was_active = false; @@ -204,8 +204,8 @@ impl KeyboardHandler { && ctx.input(|i| i.pointer.primary_clicked()) { self.engaged = false; - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on outside click: {e:?}"); + if let Err(err) = self.publish_stop() { + re_log::warn!("Failed to send stop on outside click: {err:?}"); } self.state.reset(); self.was_active = false; diff --git a/dimos/src/interaction/lcm.rs b/dimos/src/interaction/lcm.rs index 6b6b4146a8f5..58263413153a 100644 --- a/dimos/src/interaction/lcm.rs +++ b/dimos/src/interaction/lcm.rs @@ -61,10 +61,13 @@ pub struct ClickEvent { pub x: f64, pub y: f64, pub z: f64, + /// Rerun entity path (stored in frame_id per our convention). pub entity_path: String, + /// Unix timestamp in seconds. pub timestamp_sec: i32, + /// Nanosecond remainder. pub timestamp_nsec: i32, } diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index c4f4606e98d3..de73c093a824 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -12,10 +12,13 @@ static GLOBAL: re_memory::AccountingAllocator = /// LCM channel for click events (follows RViz convention) const LCM_CHANNEL: &str = "/clicked_point#geometry_msgs.PointStamped"; + /// Minimum time between click events (debouncing) const CLICK_DEBOUNCE_MS: u64 = 100; + /// Maximum rapid clicks to log as warning const RAPID_CLICK_THRESHOLD: usize = 5; + /// Default gRPC listen port (9877 to avoid conflict with stock Rerun on 9876) const DEFAULT_PORT: u16 = 9877; @@ -231,7 +234,7 @@ async fn main() -> Result<(), Box> { ); } Err(err) => { - re_log::error!("Failed to publish LCM click event: {err:?}"); + re_log::error!("Failed to publish LCM click event: {err}"); } } } From 65fd6dafca0ce882e5415e413d2c66bafceda616 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 18 Mar 2026 14:27:41 -0700 Subject: [PATCH 09/12] add websocket hack to make connect work --- Cargo.lock | 46 ++++++++ dimos/Cargo.toml | 2 + dimos/src/interaction/keyboard.rs | 56 +++++++-- dimos/src/interaction/mod.rs | 2 + dimos/src/interaction/ws.rs | 187 ++++++++++++++++++++++++++++++ dimos/src/viewer.rs | 90 +++++++++----- dimos/test_ws_client.ts | 52 +++++++++ 7 files changed, 395 insertions(+), 40 deletions(-) create mode 100644 dimos/src/interaction/ws.rs create mode 100644 dimos/test_ws_client.ts diff --git a/Cargo.lock b/Cargo.lock index b3be7f215f21..2e5969ab971f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -980,6 +980,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" dependencies = [ "axum-core", + "base64 0.22.1", "bytes", "form_urlencoded", "futures-util", @@ -998,8 +999,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -2330,6 +2333,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "data-url" version = "0.3.1" @@ -3112,11 +3121,13 @@ dependencies = [ name = "dimos-viewer" version = "0.30.0-alpha.4" dependencies = [ + "axum", "bincode", "clap", "mimalloc", "rerun", "serde", + "serde_json", "tokio", ] @@ -12595,6 +12606,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -13012,6 +13035,23 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78122066b0cb818b8afd08f7ed22f7fdbc3e90815035726f0840d0d26c0747a" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror 2.0.17", + "utf-8", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -13191,6 +13231,12 @@ dependencies = [ "xmlwriter", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8-ranges" version = "1.0.5" diff --git a/dimos/Cargo.toml b/dimos/Cargo.toml index 3dfc9283fcca..4f64a3492dc8 100644 --- a/dimos/Cargo.toml +++ b/dimos/Cargo.toml @@ -26,10 +26,12 @@ rerun = { path = "../crates/top/rerun", default-features = false, features = [ "server", ] } +axum = { workspace = true, features = ["ws"] } clap = { workspace = true, features = ["derive"] } bincode.workspace = true mimalloc.workspace = true serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true tokio = { workspace = true, features = [ "io-util", "macros", diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 26184566765d..516e627d38ac 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -8,6 +8,7 @@ use std::io; use super::lcm::{LcmPublisher, twist_command}; +use super::ws::WsPublisher; use rerun::external::{egui, re_log}; /// LCM channel for twist commands (matches DimOS convention) @@ -64,10 +65,12 @@ impl KeyState { } } -/// Handles keyboard input and publishes Twist via LCM. +/// Handles keyboard input and publishes Twist via LCM or WebSocket. /// Must be activated by clicking the overlay before keys are captured. pub struct KeyboardHandler { publisher: LcmPublisher, + /// When set (connect mode), publish twist/stop over WebSocket instead of LCM. + ws: Option, state: KeyState, was_active: bool, estop_flash: bool, // true briefly after space pressed @@ -75,11 +78,27 @@ pub struct KeyboardHandler { } impl KeyboardHandler { - /// Create a new keyboard handler with LCM publisher on CMD_VEL_CHANNEL. + /// Create a new keyboard handler that publishes twist commands via LCM. pub fn new() -> Result { let publisher = LcmPublisher::new(CMD_VEL_CHANNEL.to_string())?; Ok(Self { publisher, + ws: None, + state: KeyState::new(), + was_active: false, + estop_flash: false, + engaged: false, + }) + } + + /// Create a keyboard handler that publishes via WebSocket (connect mode). + /// + /// LCM is still created (cheap UDP socket) but will not be used. + pub fn new_ws(ws: WsPublisher) -> Result { + let publisher = LcmPublisher::new(CMD_VEL_CHANNEL.to_string())?; + Ok(Self { + publisher, + ws: Some(ws), state: KeyState::new(), was_active: false, estop_flash: false, @@ -308,16 +327,16 @@ impl KeyboardHandler { }); } - /// Convert current KeyState to Twist and publish via LCM. + /// Convert current KeyState to Twist and publish via WebSocket or LCM. fn publish_twist(&mut self) -> io::Result<()> { let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = self.compute_twist(); - let cmd = twist_command( - [lin_x, lin_y, lin_z], - [ang_x, ang_y, ang_z], - ); - - self.publisher.publish_twist(&cmd)?; + if let Some(ws) = &self.ws { + ws.send_twist(lin_x, lin_y, lin_z, ang_x, ang_y, ang_z); + } else { + let cmd = twist_command([lin_x, lin_y, lin_z], [ang_x, ang_y, ang_z]); + self.publisher.publish_twist(&cmd)?; + } re_log::trace!( "Published twist: lin=({:.2},{:.2},{:.2}) ang=({:.2},{:.2},{:.2})", @@ -327,10 +346,14 @@ impl KeyboardHandler { Ok(()) } - /// Publish all-zero twist (stop command) + /// Publish all-zero twist (stop command) via WebSocket or LCM. fn publish_stop(&mut self) -> io::Result<()> { - let cmd = twist_command([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]); - self.publisher.publish_twist(&cmd)?; + if let Some(ws) = &self.ws { + ws.send_stop(); + } else { + let cmd = twist_command([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]); + self.publisher.publish_twist(&cmd)?; + } re_log::debug!("Published stop command"); Ok(()) } @@ -403,6 +426,7 @@ mod tests { state.forward = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -420,6 +444,7 @@ mod tests { state.left = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -434,6 +459,7 @@ mod tests { state.right = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -451,6 +477,7 @@ mod tests { state.strafe_l = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -465,6 +492,7 @@ mod tests { state.strafe_r = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -483,6 +511,7 @@ mod tests { state.fast = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -501,6 +530,7 @@ mod tests { state.left = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -543,6 +573,7 @@ mod tests { state.backward = true; let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state, was_active: false, estop_flash: false, @@ -558,6 +589,7 @@ mod tests { fn test_compute_twist_all_zeros() { let handler = KeyboardHandler { publisher: LcmPublisher::new("/test".to_string()).unwrap(), + ws: None, state: KeyState::new(), was_active: false, estop_flash: false, diff --git a/dimos/src/interaction/mod.rs b/dimos/src/interaction/mod.rs index 6fec89890734..b6db0f5823e0 100644 --- a/dimos/src/interaction/mod.rs +++ b/dimos/src/interaction/mod.rs @@ -2,8 +2,10 @@ pub mod handle; pub mod keyboard; pub mod lcm; pub mod protocol; +pub mod ws; pub use handle::InteractionHandle; pub use keyboard::KeyboardHandler; pub use lcm::{ClickEvent, TwistCommand, LcmPublisher, click_event_from_ms, click_event_now, twist_command}; pub use protocol::ViewerEvent; +pub use ws::WsPublisher; diff --git a/dimos/src/interaction/ws.rs b/dimos/src/interaction/ws.rs new file mode 100644 index 000000000000..3b5e680875d4 --- /dev/null +++ b/dimos/src/interaction/ws.rs @@ -0,0 +1,187 @@ +//! WebSocket event publisher for remote (non-LCM) mode. +//! +//! When `dimos-viewer` is started with `--connect`, LCM multicast is unavailable +//! across machines. This module spawns a small WebSocket server on a local port +//! and broadcasts click and twist events as JSON to every connected client. +//! +//! Message format: newline-delimited JSON objects with a `"type"` discriminant. +//! +//! ```json +//! {"type":"heartbeat","timestamp_ms":1234567890} +//! {"type":"click","x":1.0,"y":2.0,"z":3.0,"entity_path":"/world","timestamp_ms":1234567890} +//! {"type":"twist","linear_x":0.5,"linear_y":0.0,"linear_z":0.0,"angular_x":0.0,"angular_y":0.0,"angular_z":0.8} +//! {"type":"stop"} +//! ``` + +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use axum::{ + Router, + extract::{ + WebSocketUpgrade, + ws::{Message, WebSocket}, + }, + routing::get, +}; +use rerun::external::re_log; +use serde::Serialize; +use tokio::sync::broadcast; + +/// JSON message variants sent over the WebSocket. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsEvent { + Heartbeat { + timestamp_ms: u64, + }, + Click { + x: f64, + y: f64, + z: f64, + entity_path: String, + timestamp_ms: u64, + }, + Twist { + linear_x: f64, + linear_y: f64, + linear_z: f64, + angular_x: f64, + angular_y: f64, + angular_z: f64, + }, + Stop, +} + +/// Broadcasts `WsEvent`s (serialised to JSON) to all connected WebSocket clients. +/// +/// The internal sender is `Clone`, so you can hand copies to multiple producers +/// (keyboard handler, click handler, …). +#[derive(Clone)] +pub struct WsPublisher { + tx: broadcast::Sender, +} + +impl WsPublisher { + /// Spawn the WebSocket server and heartbeat task, then return a publisher. + /// + /// The server listens on `ws://0.0.0.0:/ws`. + pub fn spawn(port: u16) -> Self { + let (tx, _rx) = broadcast::channel::(256); + + // WebSocket server + { + let tx = tx.clone(); + tokio::spawn(async move { + let app = Router::new().route( + "/ws", + get(move |upgrade: WebSocketUpgrade| { + let rx = tx.subscribe(); + async move { upgrade.on_upgrade(move |socket| serve_client(socket, rx)) } + }), + ); + + let addr = format!("0.0.0.0:{port}"); + match tokio::net::TcpListener::bind(&addr).await { + Ok(listener) => { + re_log::info!("WebSocket event server listening on ws://{addr}/ws"); + if let Err(err) = axum::serve(listener, app).await { + re_log::error!("WebSocket server error: {err}"); + } + } + Err(err) => { + re_log::error!("Failed to bind WebSocket server on {addr}: {err}"); + } + } + }); + } + + // Heartbeat task — 1 Hz + { + let tx = tx.clone(); + tokio::spawn(async move { + let mut ticker = tokio::time::interval(Duration::from_secs(1)); + loop { + ticker.tick().await; + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let event = WsEvent::Heartbeat { timestamp_ms: ts }; + if let Ok(json) = serde_json::to_string(&event) { + // Ignore send errors — nobody connected yet is fine + let _ = tx.send(json); + } + } + }); + } + + Self { tx } + } + + /// Publish a click event to all connected clients. + pub fn send_click(&self, x: f64, y: f64, z: f64, entity_path: &str, timestamp_ms: u64) { + let event = WsEvent::Click { + x, + y, + z, + entity_path: entity_path.to_string(), + timestamp_ms, + }; + self.broadcast(event); + } + + /// Publish a twist (velocity) command to all connected clients. + pub fn send_twist( + &self, + linear_x: f64, + linear_y: f64, + linear_z: f64, + angular_x: f64, + angular_y: f64, + angular_z: f64, + ) { + let event = WsEvent::Twist { + linear_x, + linear_y, + linear_z, + angular_x, + angular_y, + angular_z, + }; + self.broadcast(event); + } + + /// Publish a stop command to all connected clients. + pub fn send_stop(&self) { + self.broadcast(WsEvent::Stop); + } + + /// Return a clone of the underlying broadcast sender for use in other tasks. + pub fn sender(&self) -> broadcast::Sender { + self.tx.clone() + } + + fn broadcast(&self, event: WsEvent) { + if let Ok(json) = serde_json::to_string(&event) { + // Ignore if there are no receivers + let _ = self.tx.send(json); + } + } +} + +/// Per-client WebSocket task: forward broadcast messages until the client disconnects. +async fn serve_client(mut socket: WebSocket, mut rx: broadcast::Receiver) { + loop { + match rx.recv().await { + Ok(msg) => { + if socket.send(Message::text(msg)).await.is_err() { + break; // client disconnected + } + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(n)) => { + re_log::warn!("WebSocket client lagged, dropped {n} messages"); + } + } + } +} diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index de73c093a824..1d3772b2a487 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use clap::Parser; -use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, click_event_from_ms}; +use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, WsPublisher, click_event_from_ms}; use rerun::external::{eframe, egui, re_crash_handler, re_grpc_client, re_grpc_server, re_log, re_memory, re_uri, re_viewer}; #[global_allocator] @@ -22,6 +22,9 @@ const RAPID_CLICK_THRESHOLD: usize = 5; /// Default gRPC listen port (9877 to avoid conflict with stock Rerun on 9876) const DEFAULT_PORT: u16 = 9877; +/// Default WebSocket event port used in --connect mode +const DEFAULT_WS_PORT: u16 = 3030; + /// DimOS Interactive Viewer — a custom Rerun viewer with LCM click-to-navigate. /// /// Accepts the same CLI flags as the stock `rerun` binary so it can be spawned @@ -61,6 +64,14 @@ struct Args { /// Defaults to `rerun+http://127.0.0.1:/proxy`. #[arg(long)] connect: Option>, + + /// WebSocket port for publishing click/keyboard events in --connect mode. + /// + /// A WebSocket server is started on `ws://0.0.0.0:/ws` and + /// broadcasts JSON events to all connected clients. + /// Ignored when not using --connect. + #[arg(long, default_value_t = DEFAULT_WS_PORT)] + ws_port: u16, } /// Wraps re_viewer::App to add keyboard control interception. @@ -145,14 +156,20 @@ async fn main() -> Result<(), Box> { ) }; - // Create LCM publisher for click events - let lcm_publisher = LcmPublisher::new(LCM_CHANNEL.to_string()) - .expect("Failed to create LCM publisher"); - re_log::info!("LCM publisher created for channel: {LCM_CHANNEL}"); - - // Create keyboard handler - let keyboard_handler = KeyboardHandler::new() - .expect("Failed to create keyboard handler"); + // In connect mode: use WebSocket for events. Otherwise: use LCM (multicast). + let (lcm_publisher, ws_publisher, keyboard_handler) = if args.connect.is_some() { + let ws = WsPublisher::spawn(args.ws_port); + let kb = KeyboardHandler::new_ws(ws.clone()) + .expect("Failed to create keyboard handler"); + (None::, Some(ws), kb) + } else { + let lcm = LcmPublisher::new(LCM_CHANNEL.to_string()) + .expect("Failed to create LCM publisher"); + re_log::info!("LCM publisher created for channel: {LCM_CHANNEL}"); + let kb = KeyboardHandler::new() + .expect("Failed to create keyboard handler"); + (Some(lcm), None::, kb) + }; // State for debouncing and rapid click detection @@ -216,25 +233,42 @@ async fn main() -> Result<(), Box> { .unwrap_or_default() .as_millis() as u64; - // Build click event and publish via LCM - let click = click_event_from_ms( - [pos.x, pos.y, pos.z], - &entity_path.to_string(), - timestamp_ms, - ); - - match lcm_publisher.publish(&click) { - Ok(_) => { - re_log::debug!( - "LCM click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", - entity_path, - pos.x, - pos.y, - pos.z - ); - } - Err(err) => { - re_log::error!("Failed to publish LCM click event: {err}"); + if let Some(ws) = &ws_publisher { + // Connect mode: publish click over WebSocket + ws.send_click( + pos.x as f64, + pos.y as f64, + pos.z as f64, + &entity_path.to_string(), + timestamp_ms, + ); + re_log::debug!( + "WS click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, + pos.x, + pos.y, + pos.z + ); + } else if let Some(lcm) = &lcm_publisher { + // Local mode: publish click via LCM + let click = click_event_from_ms( + [pos.x, pos.y, pos.z], + &entity_path.to_string(), + timestamp_ms, + ); + match lcm.publish(&click) { + Ok(_) => { + re_log::debug!( + "LCM click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, + pos.x, + pos.y, + pos.z + ); + } + Err(err) => { + re_log::error!("Failed to publish LCM click event: {err}"); + } } } } diff --git a/dimos/test_ws_client.ts b/dimos/test_ws_client.ts new file mode 100644 index 000000000000..c038f8881dfb --- /dev/null +++ b/dimos/test_ws_client.ts @@ -0,0 +1,52 @@ +/** + * Deno WebSocket test client for dimos-viewer event stream. + * + * Run with: + * deno run --allow-net dimos/test_ws_client.ts + * + * Or with a custom address: + * deno run --allow-net dimos/test_ws_client.ts ws://127.0.0.1:3030/ws + */ + +const url = Deno.args[0] ?? "ws://127.0.0.1:3030/ws"; + +console.log(`Connecting to ${url} …`); + +const ws = new WebSocket(url); + +ws.addEventListener("open", () => { + console.log(`[connected] Listening for events from dimos-viewer`); +}); + +ws.addEventListener("message", (ev) => { + const ts = new Date().toISOString(); + try { + const msg = JSON.parse(ev.data as string); + if (msg.type === "heartbeat") { + console.log(`[${ts}] heartbeat ts=${msg.timestamp_ms}`); + } else if (msg.type === "click") { + console.log( + `[${ts}] click x=${msg.x.toFixed(3)} y=${msg.y.toFixed(3)} z=${msg.z.toFixed(3)} entity="${msg.entity_path}"`, + ); + } else if (msg.type === "twist") { + console.log( + `[${ts}] twist lin=(${msg.linear_x.toFixed(2)}, ${msg.linear_y.toFixed(2)}, ${msg.linear_z.toFixed(2)}) ang=(${msg.angular_x.toFixed(2)}, ${msg.angular_y.toFixed(2)}, ${msg.angular_z.toFixed(2)})`, + ); + } else if (msg.type === "stop") { + console.log(`[${ts}] stop`); + } else { + console.log(`[${ts}] unknown `, msg); + } + } catch { + console.log(`[${ts}] raw:`, ev.data); + } +}); + +ws.addEventListener("close", (ev) => { + console.log(`[disconnected] code=${ev.code} reason=${ev.reason}`); + Deno.exit(0); +}); + +ws.addEventListener("error", (ev) => { + console.error(`[error]`, ev); +}); From 3eb8b8b4dbb6d494d75c641b9445c38a9508ec26 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Sat, 21 Mar 2026 15:55:13 -0700 Subject: [PATCH 10/12] feat: switch WS from server to client mode, drain stale commands on reconnect - WsPublisher now connects as a client to RerunWebSocketServer (Python) instead of running its own axum server - Replaced axum + broadcast with tokio-tungstenite + mpsc - Drain queued commands on reconnect to avoid sending stale velocities - Changed --ws-port to --ws-url for full URL control - Moved WS_EVENTS.md to repo root with other docs --- Cargo.lock | 6 +- WS_EVENTS.md | 151 ++++++++++++++++++++++++++++++++++++ dimos/Cargo.toml | 3 +- dimos/src/interaction/ws.rs | 145 +++++++++++++--------------------- dimos/src/viewer.rs | 37 ++++++--- 5 files changed, 236 insertions(+), 106 deletions(-) create mode 100644 WS_EVENTS.md diff --git a/Cargo.lock b/Cargo.lock index 2e5969ab971f..5bf6916d1806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -980,7 +980,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" dependencies = [ "axum-core", - "base64 0.22.1", "bytes", "form_urlencoded", "futures-util", @@ -999,10 +998,8 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -3121,14 +3118,15 @@ dependencies = [ name = "dimos-viewer" version = "0.30.0-alpha.4" dependencies = [ - "axum", "bincode", "clap", + "futures-util", "mimalloc", "rerun", "serde", "serde_json", "tokio", + "tokio-tungstenite", ] [[package]] diff --git a/WS_EVENTS.md b/WS_EVENTS.md new file mode 100644 index 000000000000..ee0c5e7f5edc --- /dev/null +++ b/WS_EVENTS.md @@ -0,0 +1,151 @@ +# dimos-viewer WebSocket Event Stream + +When `dimos-viewer` is started with `--connect`, LCM multicast is not available +(LCM uses UDP multicast which is limited to the local machine or subnet). Instead, +the viewer starts a WebSocket server that broadcasts click and keyboard events as +JSON to any connected client. + +## Starting the server + +```sh +dimos-viewer --connect [] [--ws-port ] +``` + +| Flag | Default | Description | +|------|---------|-------------| +| `--connect [url]` | — | Enable connect mode. Optionally specify the gRPC proxy URL (defaults to `rerun+http://127.0.0.1:9877/proxy`). | +| `--ws-port ` | `3030` | Port for the WebSocket event server. | + +The WebSocket server listens on: + +``` +ws://0.0.0.0:/ws +``` + +Multiple clients can connect simultaneously. All connected clients receive every +message. The server does not accept any inbound messages from clients. + +## Message format + +All messages are UTF-8 JSON objects with a `"type"` string discriminant. + +### `heartbeat` + +Sent once per second regardless of viewer activity. Useful for connection health +checks and detecting viewer restarts. + +```json +{ + "type": "heartbeat", + "timestamp_ms": 1773869091428 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `timestamp_ms` | `u64` | Unix timestamp in milliseconds (from the viewer's system clock). | + +### `click` + +Sent when the user clicks a 3D point in the Rerun viewport. Corresponds to the +`/clicked_point` convention from RViz / `geometry_msgs/PointStamped`. + +```json +{ + "type": "click", + "x": 1.234, + "y": -0.567, + "z": 0.000, + "entity_path": "/world/ground_plane", + "timestamp_ms": 1773869091428 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `x` | `f64` | World-space X coordinate (metres). | +| `y` | `f64` | World-space Y coordinate (metres). | +| `z` | `f64` | World-space Z coordinate (metres). | +| `entity_path` | `string` | Rerun entity path of the clicked object. | +| `timestamp_ms` | `u64` | Unix timestamp in milliseconds at the moment of the click. | + +Click events are debounced: a minimum of 100 ms must elapse between successive +published clicks. Rapid clicks within that window are silently dropped (with a +warning logged after 5 consecutive rapid clicks). + +### `twist` + +Sent every frame (~60 Hz) while movement keys are held on the keyboard teleop +overlay. Corresponds to `geometry_msgs/Twist` / `/cmd_vel` convention. + +The keyboard overlay must first be **engaged** by clicking on it (green border = +active). Clicking anywhere outside the overlay disengages it and sends a `stop`. + +```json +{ + "type": "twist", + "linear_x": 0.5, + "linear_y": 0.0, + "linear_z": 0.0, + "angular_x": 0.0, + "angular_y": 0.0, + "angular_z": 0.8 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `linear_x` | `f64` | Forward (+) / backward (−) velocity in m/s. | +| `linear_y` | `f64` | Strafe left (+) / right (−) velocity in m/s. | +| `linear_z` | `f64` | Vertical velocity in m/s (always 0 for ground robots). | +| `angular_x` | `f64` | Roll rate in rad/s (always 0). | +| `angular_y` | `f64` | Pitch rate in rad/s (always 0). | +| `angular_z` | `f64` | Yaw left (+) / right (−) rate in rad/s. | + +**Key bindings:** + +| Key | Effect | +|-----|--------| +| `W` / `↑` | Forward (`linear_x = +0.5`) | +| `S` / `↓` | Backward (`linear_x = −0.5`) | +| `A` / `←` | Turn left (`angular_z = +0.8`) | +| `D` / `→` | Turn right (`angular_z = −0.8`) | +| `Q` | Strafe left (`linear_y = +0.5`) | +| `E` | Strafe right (`linear_y = −0.5`) | +| `Shift` | Speed multiplier ×2 | + +Multiple keys can be held simultaneously; their effects are summed. + +### `stop` + +Sent when all movement keys are released, the overlay is disengaged, or `Space` +is pressed (emergency stop). Signals the robot to halt immediately. + +```json +{ + "type": "stop" +} +``` + +No additional fields. Semantically equivalent to a `twist` with all fields zero. + +## Example client (Deno) + +A reference test client is provided at `dimos/test_ws_client.ts`: + +```sh +deno run --allow-net dimos/test_ws_client.ts +# or with a custom address: +deno run --allow-net dimos/test_ws_client.ts ws://192.168.1.10:3030/ws +``` + +## Local mode (no `--connect`) + +Without `--connect`, the viewer uses LCM UDP multicast instead of WebSocket: + +| Channel | Message type | +|---------|-------------| +| `/clicked_point#geometry_msgs.PointStamped` | Click events | +| `/cmd_vel#geometry_msgs.Twist` | Twist / stop commands | + +The WebSocket server is **not** started in this mode. diff --git a/dimos/Cargo.toml b/dimos/Cargo.toml index 4f64a3492dc8..c2150ed5265b 100644 --- a/dimos/Cargo.toml +++ b/dimos/Cargo.toml @@ -26,9 +26,9 @@ rerun = { path = "../crates/top/rerun", default-features = false, features = [ "server", ] } -axum = { workspace = true, features = ["ws"] } clap = { workspace = true, features = ["derive"] } bincode.workspace = true +futures-util.workspace = true mimalloc.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true @@ -41,6 +41,7 @@ tokio = { workspace = true, features = [ "sync", "time", ] } +tokio-tungstenite = "0.28.0" [lints] workspace = true diff --git a/dimos/src/interaction/ws.rs b/dimos/src/interaction/ws.rs index 3b5e680875d4..037e858e3f6f 100644 --- a/dimos/src/interaction/ws.rs +++ b/dimos/src/interaction/ws.rs @@ -1,39 +1,28 @@ -//! WebSocket event publisher for remote (non-LCM) mode. +//! WebSocket client for remote (non-LCM) mode. //! //! When `dimos-viewer` is started with `--connect`, LCM multicast is unavailable -//! across machines. This module spawns a small WebSocket server on a local port -//! and broadcasts click and twist events as JSON to every connected client. +//! across machines. This module connects to a WebSocket server (typically the +//! Python `RerunWebSocketServer` module) and sends click, twist, and stop events +//! as JSON. //! -//! Message format: newline-delimited JSON objects with a `"type"` discriminant. +//! Message format (JSON objects with a `"type"` discriminant): //! //! ```json -//! {"type":"heartbeat","timestamp_ms":1234567890} //! {"type":"click","x":1.0,"y":2.0,"z":3.0,"entity_path":"/world","timestamp_ms":1234567890} //! {"type":"twist","linear_x":0.5,"linear_y":0.0,"linear_z":0.0,"angular_x":0.0,"angular_y":0.0,"angular_z":0.8} //! {"type":"stop"} //! ``` -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; -use axum::{ - Router, - extract::{ - WebSocketUpgrade, - ws::{Message, WebSocket}, - }, - routing::get, -}; use rerun::external::re_log; use serde::Serialize; -use tokio::sync::broadcast; +use tokio::sync::mpsc; /// JSON message variants sent over the WebSocket. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum WsEvent { - Heartbeat { - timestamp_ms: u64, - }, Click { x: f64, y: f64, @@ -52,73 +41,32 @@ pub enum WsEvent { Stop, } -/// Broadcasts `WsEvent`s (serialised to JSON) to all connected WebSocket clients. +/// Sends `WsEvent`s (serialised to JSON) to a remote WebSocket server. /// -/// The internal sender is `Clone`, so you can hand copies to multiple producers +/// Maintains a persistent connection with automatic reconnection. The +/// internal sender is `Clone`, so you can hand copies to multiple producers /// (keyboard handler, click handler, …). #[derive(Clone)] pub struct WsPublisher { - tx: broadcast::Sender, + tx: mpsc::Sender, } impl WsPublisher { - /// Spawn the WebSocket server and heartbeat task, then return a publisher. + /// Spawn the WebSocket client task and return a publisher. /// - /// The server listens on `ws://0.0.0.0:/ws`. - pub fn spawn(port: u16) -> Self { - let (tx, _rx) = broadcast::channel::(256); - - // WebSocket server - { - let tx = tx.clone(); - tokio::spawn(async move { - let app = Router::new().route( - "/ws", - get(move |upgrade: WebSocketUpgrade| { - let rx = tx.subscribe(); - async move { upgrade.on_upgrade(move |socket| serve_client(socket, rx)) } - }), - ); - - let addr = format!("0.0.0.0:{port}"); - match tokio::net::TcpListener::bind(&addr).await { - Ok(listener) => { - re_log::info!("WebSocket event server listening on ws://{addr}/ws"); - if let Err(err) = axum::serve(listener, app).await { - re_log::error!("WebSocket server error: {err}"); - } - } - Err(err) => { - re_log::error!("Failed to bind WebSocket server on {addr}: {err}"); - } - } - }); - } + /// The client connects to `url` (e.g. `ws://127.0.0.1:3030/ws`) and + /// reconnects automatically whenever the connection drops. + pub fn connect(url: String) -> Self { + let (tx, rx) = mpsc::channel::(256); - // Heartbeat task — 1 Hz - { - let tx = tx.clone(); - tokio::spawn(async move { - let mut ticker = tokio::time::interval(Duration::from_secs(1)); - loop { - ticker.tick().await; - let ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - let event = WsEvent::Heartbeat { timestamp_ms: ts }; - if let Ok(json) = serde_json::to_string(&event) { - // Ignore send errors — nobody connected yet is fine - let _ = tx.send(json); - } - } - }); - } + tokio::spawn(async move { + run_client(url, rx).await; + }); Self { tx } } - /// Publish a click event to all connected clients. + /// Publish a click event. pub fn send_click(&self, x: f64, y: f64, z: f64, entity_path: &str, timestamp_ms: u64) { let event = WsEvent::Click { x, @@ -130,7 +78,7 @@ impl WsPublisher { self.broadcast(event); } - /// Publish a twist (velocity) command to all connected clients. + /// Publish a twist (velocity) command. pub fn send_twist( &self, linear_x: f64, @@ -151,37 +99,54 @@ impl WsPublisher { self.broadcast(event); } - /// Publish a stop command to all connected clients. + /// Publish a stop command. pub fn send_stop(&self) { self.broadcast(WsEvent::Stop); } - /// Return a clone of the underlying broadcast sender for use in other tasks. - pub fn sender(&self) -> broadcast::Sender { - self.tx.clone() - } - fn broadcast(&self, event: WsEvent) { if let Ok(json) = serde_json::to_string(&event) { - // Ignore if there are no receivers - let _ = self.tx.send(json); + // Non-blocking: drop message if the channel is full rather than block the UI thread. + if self.tx.try_send(json).is_err() { + re_log::warn!("WsPublisher: send queue full, dropped event"); + } } } } -/// Per-client WebSocket task: forward broadcast messages until the client disconnects. -async fn serve_client(mut socket: WebSocket, mut rx: broadcast::Receiver) { +/// Background task: connect → send → reconnect loop. +async fn run_client(url: String, mut rx: mpsc::Receiver) { + use tokio_tungstenite::{connect_async, tungstenite::Message}; + use futures_util::SinkExt; + loop { - match rx.recv().await { - Ok(msg) => { - if socket.send(Message::text(msg)).await.is_err() { - break; // client disconnected + re_log::info!("WsPublisher: connecting to {url}"); + + match connect_async(&url).await { + Ok((mut ws_stream, _)) => { + re_log::info!("WsPublisher: connected to {url}"); + + // Drain the channel into the WebSocket until it closes or errors. + while let Some(msg) = rx.recv().await { + if let Err(err) = ws_stream.send(Message::text(msg)).await { + re_log::warn!("WsPublisher: send error: {err} — reconnecting"); + break; + } + } + // rx closed → task is done + if rx.is_closed() { + break; } } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(n)) => { - re_log::warn!("WebSocket client lagged, dropped {n} messages"); + Err(err) => { + re_log::debug!("WsPublisher: connection failed: {err} — retrying in 1s"); } } + + // Drain any stale commands queued during the disconnect — sending + // outdated velocity commands on reconnect would be dangerous. + while rx.try_recv().is_ok() {} + + tokio::time::sleep(Duration::from_secs(1)).await; } } diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index 1d3772b2a487..afa3f0115969 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -22,8 +22,8 @@ const RAPID_CLICK_THRESHOLD: usize = 5; /// Default gRPC listen port (9877 to avoid conflict with stock Rerun on 9876) const DEFAULT_PORT: u16 = 9877; -/// Default WebSocket event port used in --connect mode -const DEFAULT_WS_PORT: u16 = 3030; +/// Default WebSocket server URL to connect to in --connect mode +const DEFAULT_WS_URL: &str = "ws://127.0.0.1:3030/ws"; /// DimOS Interactive Viewer — a custom Rerun viewer with LCM click-to-navigate. /// @@ -65,13 +65,18 @@ struct Args { #[arg(long)] connect: Option>, - /// WebSocket port for publishing click/keyboard events in --connect mode. + /// WebSocket server URL to connect to for publishing click/keyboard events. /// - /// A WebSocket server is started on `ws://0.0.0.0:/ws` and - /// broadcasts JSON events to all connected clients. - /// Ignored when not using --connect. - #[arg(long, default_value_t = DEFAULT_WS_PORT)] - ws_port: u16, + /// The viewer connects as a WebSocket CLIENT to this URL and sends JSON + /// events (click, twist, stop). The server is typically the Python + /// `RerunWebSocketServer` DimOS module. + /// + /// When provided explicitly this flag activates WebSocket event publishing + /// in ALL modes (local gRPC server or --connect). When omitted, the + /// behaviour depends on the mode: --connect defaults to this URL; local + /// mode defaults to LCM multicast. + #[arg(long)] + ws_url: Option, } /// Wraps re_viewer::App to add keyboard control interception. @@ -156,9 +161,19 @@ async fn main() -> Result<(), Box> { ) }; - // In connect mode: use WebSocket for events. Otherwise: use LCM (multicast). - let (lcm_publisher, ws_publisher, keyboard_handler) = if args.connect.is_some() { - let ws = WsPublisher::spawn(args.ws_port); + // Resolve the effective WebSocket URL: + // - explicit --ws-url → always use WS (local or connect mode) + // - --connect (no --ws-url) → default WS URL + // - neither → LCM multicast + let effective_ws_url: Option = match (&args.ws_url, &args.connect) { + (Some(url), _) => Some(url.clone()), // explicit --ws-url + (None, Some(_)) => Some(DEFAULT_WS_URL.to_string()), // --connect default + (None, None) => None, // local LCM mode + }; + + let (lcm_publisher, ws_publisher, keyboard_handler) = if let Some(ws_url) = effective_ws_url { + let ws = WsPublisher::connect(ws_url.clone()); + re_log::info!("WebSocket client connecting to {ws_url}"); let kb = KeyboardHandler::new_ws(ws.clone()) .expect("Failed to create keyboard handler"); (None::, Some(ws), kb) From a377b48f95c7dc9ecc38aa529a3a2d222236b6df Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Sat, 21 Mar 2026 19:31:35 -0700 Subject: [PATCH 11/12] fix: respond to server WebSocket pings to prevent disconnects The Python server (websockets library) sends keepalive pings every 30s and expects a pong within 30s. The old code only wrote to the stream and never read, so tungstenite never got a chance to auto-reply with pong frames. This caused the server to close the connection every ~2 minutes. Split the stream into read/write halves and spawn a reader task that consumes incoming frames (triggering tungstenite's automatic pong replies). The write loop uses tokio::select! to also detect when the reader exits (server closed), triggering immediate reconnection instead of waiting for the next send to fail. --- dimos/src/interaction/ws.rs | 52 ++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/dimos/src/interaction/ws.rs b/dimos/src/interaction/ws.rs index 037e858e3f6f..faec46d54d27 100644 --- a/dimos/src/interaction/ws.rs +++ b/dimos/src/interaction/ws.rs @@ -116,25 +116,57 @@ impl WsPublisher { /// Background task: connect → send → reconnect loop. async fn run_client(url: String, mut rx: mpsc::Receiver) { + use futures_util::{SinkExt, StreamExt}; use tokio_tungstenite::{connect_async, tungstenite::Message}; - use futures_util::SinkExt; loop { re_log::info!("WsPublisher: connecting to {url}"); match connect_async(&url).await { - Ok((mut ws_stream, _)) => { + Ok((ws_stream, _)) => { re_log::info!("WsPublisher: connected to {url}"); - // Drain the channel into the WebSocket until it closes or errors. - while let Some(msg) = rx.recv().await { - if let Err(err) = ws_stream.send(Message::text(msg)).await { - re_log::warn!("WsPublisher: send error: {err} — reconnecting"); - break; + let (mut writer, mut reader) = ws_stream.split(); + + // Read task: consume incoming frames (ping → auto pong) so the + // server's keepalive pings get answered and the connection stays + // alive. Exits when the server closes or an error occurs. + let mut read_handle = tokio::spawn(async move { + while let Some(frame) = reader.next().await { + match frame { + Ok(Message::Close(_)) => break, + Err(err) => { + re_log::debug!("WsPublisher: read error: {err}"); + break; + } + _ => {} // Ping/Pong handled by tungstenite internally + } } - } - // rx closed → task is done - if rx.is_closed() { + }); + + // Write loop: drain the channel into the WebSocket. + let disconnected = loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Some(text) => { + if let Err(err) = writer.send(Message::text(text)).await { + re_log::warn!("WsPublisher: send error: {err} — reconnecting"); + break false; + } + } + None => break true, // rx closed → task is done + } + } + _ = &mut read_handle => { + // Reader exited → server closed the connection. + re_log::warn!("WsPublisher: server closed connection — reconnecting"); + break false; + } + } + }; + + if disconnected { break; } } From 07f64d1246a381390d53544ec3f49066e5a9a896 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Sun, 22 Mar 2026 23:23:28 -0700 Subject: [PATCH 12/12] refactor: remove LCM transport, use WebSocket only for viewer events - Delete interaction/lcm.rs (LCM publisher no longer needed) - Simplify KeyboardHandler to take WsPublisher directly - Remove dual LCM/WS code paths in viewer.rs - Always connect via WebSocket (default ws://127.0.0.1:3030/ws) --- dimos/src/interaction/keyboard.rs | 194 +++---------- dimos/src/interaction/lcm.rs | 467 ------------------------------ dimos/src/interaction/mod.rs | 2 - dimos/src/viewer.rs | 96 ++---- dimos/uv.lock | 8 + uv.lock | 2 +- 6 files changed, 82 insertions(+), 687 deletions(-) delete mode 100644 dimos/src/interaction/lcm.rs create mode 100644 dimos/uv.lock diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index 516e627d38ac..cc3fda6675d9 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -6,14 +6,9 @@ //! - Space for emergency stop //! - Shift for speed multiplier -use std::io; -use super::lcm::{LcmPublisher, twist_command}; use super::ws::WsPublisher; use rerun::external::{egui, re_log}; -/// LCM channel for twist commands (matches DimOS convention) -const CMD_VEL_CHANNEL: &str = "/cmd_vel#geometry_msgs.Twist"; - /// Base speeds for keyboard control const BASE_LINEAR_SPEED: f64 = 0.5; // m/s const BASE_ANGULAR_SPEED: f64 = 0.8; // rad/s @@ -65,12 +60,10 @@ impl KeyState { } } -/// Handles keyboard input and publishes Twist via LCM or WebSocket. +/// Handles keyboard input and publishes Twist via WebSocket. /// Must be activated by clicking the overlay before keys are captured. pub struct KeyboardHandler { - publisher: LcmPublisher, - /// When set (connect mode), publish twist/stop over WebSocket instead of LCM. - ws: Option, + ws: WsPublisher, state: KeyState, was_active: bool, estop_flash: bool, // true briefly after space pressed @@ -78,32 +71,15 @@ pub struct KeyboardHandler { } impl KeyboardHandler { - /// Create a new keyboard handler that publishes twist commands via LCM. - pub fn new() -> Result { - let publisher = LcmPublisher::new(CMD_VEL_CHANNEL.to_string())?; - Ok(Self { - publisher, - ws: None, - state: KeyState::new(), - was_active: false, - estop_flash: false, - engaged: false, - }) - } - - /// Create a keyboard handler that publishes via WebSocket (connect mode). - /// - /// LCM is still created (cheap UDP socket) but will not be used. - pub fn new_ws(ws: WsPublisher) -> Result { - let publisher = LcmPublisher::new(CMD_VEL_CHANNEL.to_string())?; - Ok(Self { - publisher, - ws: Some(ws), + /// Create a new keyboard handler that publishes twist commands via WebSocket. + pub fn new(ws: WsPublisher) -> Self { + Self { + ws, state: KeyState::new(), was_active: false, estop_flash: false, engaged: false, - }) + } } /// Process keyboard input from egui and publish Twist if keys are held. @@ -117,9 +93,7 @@ impl KeyboardHandler { // If not engaged, don't capture any keys if !self.engaged { if self.was_active { - if let Err(err) = self.publish_stop() { - re_log::warn!("Failed to send stop on disengage: {err:?}"); - } + self.publish_stop(); self.was_active = false; } return false; @@ -131,9 +105,7 @@ impl KeyboardHandler { // Check for emergency stop (Space key pressed - one-shot action) if ctx.input(|i| i.key_pressed(egui::Key::Space)) { self.state.reset(); - if let Err(err) = self.publish_stop() { - re_log::warn!("Failed to send emergency stop: {err:?}"); - } + self.publish_stop(); self.was_active = false; self.estop_flash = true; return true; // return true so overlay shows the e-stop flash @@ -141,14 +113,10 @@ impl KeyboardHandler { // Publish twist command if keys are active, or stop if just released if self.state.any_active() { - if let Err(err) = self.publish_twist() { - re_log::warn!("Failed to publish twist command: {err:?}"); - } + self.publish_twist(); self.was_active = true; } else if self.was_active { - if let Err(err) = self.publish_stop() { - re_log::warn!("Failed to send stop on key release: {err:?}"); - } + self.publish_stop(); self.was_active = false; } @@ -207,9 +175,7 @@ impl KeyboardHandler { self.engaged = !self.engaged; if !self.engaged { // Send stop when disengaging - if let Err(err) = self.publish_stop() { - re_log::warn!("Failed to send stop on disengage: {err:?}"); - } + self.publish_stop(); self.state.reset(); self.was_active = false; } @@ -223,9 +189,7 @@ impl KeyboardHandler { && ctx.input(|i| i.pointer.primary_clicked()) { self.engaged = false; - if let Err(err) = self.publish_stop() { - re_log::warn!("Failed to send stop on outside click: {err:?}"); - } + self.publish_stop(); self.state.reset(); self.was_active = false; } @@ -327,35 +291,21 @@ impl KeyboardHandler { }); } - /// Convert current KeyState to Twist and publish via WebSocket or LCM. - fn publish_twist(&mut self) -> io::Result<()> { + /// Convert current KeyState to Twist and publish via WebSocket. + fn publish_twist(&mut self) { let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = self.compute_twist(); + self.ws.send_twist(lin_x, lin_y, lin_z, ang_x, ang_y, ang_z); - if let Some(ws) = &self.ws { - ws.send_twist(lin_x, lin_y, lin_z, ang_x, ang_y, ang_z); - } else { - let cmd = twist_command([lin_x, lin_y, lin_z], [ang_x, ang_y, ang_z]); - self.publisher.publish_twist(&cmd)?; - } - - re_log::trace!( + re_log::info!( "Published twist: lin=({:.2},{:.2},{:.2}) ang=({:.2},{:.2},{:.2})", lin_x, lin_y, lin_z, ang_x, ang_y, ang_z ); - - Ok(()) } - /// Publish all-zero twist (stop command) via WebSocket or LCM. - fn publish_stop(&mut self) -> io::Result<()> { - if let Some(ws) = &self.ws { - ws.send_stop(); - } else { - let cmd = twist_command([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]); - self.publisher.publish_twist(&cmd)?; - } - re_log::debug!("Published stop command"); - Ok(()) + /// Publish all-zero twist (stop command) via WebSocket. + fn publish_stop(&mut self) { + self.ws.send_stop(); + re_log::info!("Published stop command"); } /// Map KeyState to linear/angular velocities. @@ -405,6 +355,23 @@ impl std::fmt::Debug for KeyboardHandler { mod tests { use super::*; + /// Create a dummy WsPublisher for tests (connects to a non-existent server, + /// which is fine — we only test compute_twist, never actually send). + fn test_ws() -> WsPublisher { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { WsPublisher::connect("ws://127.0.0.1:1/test".to_string()) }) + } + + fn handler_with(state: KeyState) -> KeyboardHandler { + KeyboardHandler { + ws: test_ws(), + state, + was_active: false, + estop_flash: false, + engaged: true, + } + } + #[test] fn test_key_state_any_active() { let mut state = KeyState::new(); @@ -424,14 +391,7 @@ mod tests { fn test_wasd_to_twist_mapping() { let mut state = KeyState::new(); state.forward = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); assert_eq!(lin_y, 0.0); @@ -442,14 +402,7 @@ mod tests { fn test_turn_left_right_mapping() { let mut state = KeyState::new(); state.left = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -457,14 +410,7 @@ mod tests { let mut state = KeyState::new(); state.right = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -475,14 +421,7 @@ mod tests { fn test_strafe_mapping() { let mut state = KeyState::new(); state.strafe_l = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, BASE_LINEAR_SPEED); @@ -490,14 +429,7 @@ mod tests { let mut state = KeyState::new(); state.strafe_r = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, -BASE_LINEAR_SPEED); @@ -509,14 +441,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.fast = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED * FAST_MULTIPLIER); assert_eq!(lin_y, 0.0); @@ -528,14 +453,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.left = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); assert_eq!(lin_y, 0.0); @@ -558,9 +476,7 @@ mod tests { #[test] fn test_keyboard_handler_creation() { - let handler = KeyboardHandler::new(); - assert!(handler.is_ok()); - let handler = handler.unwrap(); + let handler = KeyboardHandler::new(test_ws()); assert!(!handler.was_active); assert!(!handler.engaged); assert!(!handler.state.any_active()); @@ -571,14 +487,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.backward = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state, - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -587,14 +496,7 @@ mod tests { #[test] fn test_compute_twist_all_zeros() { - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - ws: None, - state: KeyState::new(), - was_active: false, - estop_flash: false, - engaged: true, - }; + let handler = handler_with(KeyState::new()); let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); diff --git a/dimos/src/interaction/lcm.rs b/dimos/src/interaction/lcm.rs deleted file mode 100644 index 58263413153a..000000000000 --- a/dimos/src/interaction/lcm.rs +++ /dev/null @@ -1,467 +0,0 @@ -//! LCM (Lightweight Communications and Marshalling) publisher for click events and twist commands. -//! -//! Publishes `geometry_msgs/PointStamped` and `geometry_msgs/Twist` messages over UDP multicast, -//! following the same convention as RViz's `/clicked_point` and `/cmd_vel` topics. -//! -//! ## LCM Wire Protocol (short message) -//! ```text -//! [4B magic "LC02"] [4B seqno] [channel\0] [LCM-encoded payload] -//! ``` -//! -//! ## PointStamped Binary Layout -//! ```text -//! [8B fingerprint hash] [Header (no hash)] [Point (no hash)] -//! -//! Header: -//! [4B seq: i32] [4B stamp.sec: i32] [4B stamp.nsec: i32] -//! [4B frame_id_len: i32 (including null)] [frame_id bytes] [null] -//! -//! Point: -//! [8B x: f64] [8B y: f64] [8B z: f64] -//! ``` -//! -//! ## Twist Binary Layout -//! ```text -//! [8B fingerprint hash] [Twist (no hash)] -//! -//! Twist: -//! Vector3 linear: [8B x: f64] [8B y: f64] [8B z: f64] -//! Vector3 angular: [8B x: f64] [8B y: f64] [8B z: f64] -//! ``` - -use std::net::UdpSocket; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::SystemTime; - -/// LCM multicast address and port (default LCM configuration). -const LCM_MULTICAST_ADDR: &str = "239.255.76.67:7667"; - -/// LCM short message magic number: "LC02" in ASCII. -const LCM_MAGIC_SHORT: u32 = 0x4c433032; - -/// Pre-computed fingerprint hash for `geometry_msgs/PointStamped`. -/// -/// Computed from the recursive hash chain: -/// - Time: base=0xde1d24a3a8ecb648 -> rot -> 0xbc3a494751d96c91 -/// - Header: base=0xdbb33f5b4c19b8ea + Time -> rot -> 0x2fdb11453be64af7 -/// - Point: base=0x573f2fdd2f76508f -> rot -> 0xae7e5fba5eeca11e -/// - PointStamped: base=0xf012413a2c8028c2 + Header + Point -> rot -> 0x9cd764738ea629af -const POINT_STAMPED_HASH: u64 = 0x9cd764738ea629af; - -/// Pre-computed fingerprint hash for `geometry_msgs/Twist`. -/// -/// Computed from the recursive hash chain: -/// - Vector3: base=0x573f2fdd2f76508f -> rot -> 0xae7e5fba5eeca11e -/// - Twist: base=0x3a4144772922add7 + Vector3 + Vector3 -> rot -> 0x2e7c07d7cdf7e027 -const TWIST_HASH: u64 = 0x2e7c07d7cdf7e027; - -/// A click event with world-space coordinates and entity info. -#[derive(Debug, Clone)] -pub struct ClickEvent { - pub x: f64, - pub y: f64, - pub z: f64, - - /// Rerun entity path (stored in frame_id per our convention). - pub entity_path: String, - - /// Unix timestamp in seconds. - pub timestamp_sec: i32, - - /// Nanosecond remainder. - pub timestamp_nsec: i32, -} - -/// A velocity command (maps to geometry_msgs/Twist). -#[derive(Debug, Clone)] -pub struct TwistCommand { - pub linear_x: f64, // forward/backward - pub linear_y: f64, // strafe left/right - pub linear_z: f64, // up/down (unused for ground robots) - pub angular_x: f64, // roll (unused) - pub angular_y: f64, // pitch (unused) - pub angular_z: f64, // yaw left/right -} - -/// Encodes a `PointStamped` LCM message (with fingerprint hash prefix). -/// -/// Binary layout: -/// - 8 bytes: fingerprint hash (big-endian i64) -/// - Header (no hash): seq(i32) + stamp.sec(i32) + stamp.nsec(i32) + frame_id(len-prefixed string) -/// - Point (no hash): x(f64) + y(f64) + z(f64) -pub fn encode_point_stamped(event: &ClickEvent) -> Vec { - let frame_id_bytes = event.entity_path.as_bytes(); - // LCM string encoding: i32 length (including null terminator) + bytes + null - let string_len = (frame_id_bytes.len() + 1) as i32; - - // Calculate total size: - // 8 (hash) + 4 (seq) + 4 (sec) + 4 (nsec) + 4 (string_len) + frame_id_bytes + 1 (null) + 24 (3 doubles) - let total_size = 8 + 4 + 4 + 4 + 4 + frame_id_bytes.len() + 1 + 24; - let mut buf = Vec::with_capacity(total_size); - - // Fingerprint hash (big-endian) - buf.extend_from_slice(&POINT_STAMPED_HASH.to_be_bytes()); - - // Header._encodeNoHash: - // seq (i32, big-endian) -- always 0 for click events - buf.extend_from_slice(&0i32.to_be_bytes()); - // stamp.sec (i32) - buf.extend_from_slice(&event.timestamp_sec.to_be_bytes()); - // stamp.nsec (i32) - buf.extend_from_slice(&event.timestamp_nsec.to_be_bytes()); - // frame_id: string = i32 length (incl null) + bytes + null - buf.extend_from_slice(&string_len.to_be_bytes()); - buf.extend_from_slice(frame_id_bytes); - buf.push(0); // null terminator - - // Point._encodeNoHash: - buf.extend_from_slice(&event.x.to_be_bytes()); - buf.extend_from_slice(&event.y.to_be_bytes()); - buf.extend_from_slice(&event.z.to_be_bytes()); - - buf -} - -/// Encodes a `Twist` LCM message (with fingerprint hash prefix). -/// -/// Binary layout: -/// - 8 bytes: fingerprint hash (big-endian u64) -/// - Twist (no hash): linear(Vector3: x,y,z f64) + angular(Vector3: x,y,z f64) -pub fn encode_twist(cmd: &TwistCommand) -> Vec { - // 8 (hash) + 48 (6 doubles) = 56 bytes - let mut buf = Vec::with_capacity(56); - - // Fingerprint hash (big-endian) - buf.extend_from_slice(&TWIST_HASH.to_be_bytes()); - - // Twist._encodeNoHash: - // Vector3 linear: - buf.extend_from_slice(&cmd.linear_x.to_be_bytes()); - buf.extend_from_slice(&cmd.linear_y.to_be_bytes()); - buf.extend_from_slice(&cmd.linear_z.to_be_bytes()); - // Vector3 angular: - buf.extend_from_slice(&cmd.angular_x.to_be_bytes()); - buf.extend_from_slice(&cmd.angular_y.to_be_bytes()); - buf.extend_from_slice(&cmd.angular_z.to_be_bytes()); - - buf -} - -/// Builds a complete LCM UDP packet (short message format). -/// -/// Format: `[4B magic] [4B seqno] [channel\0] [payload]` -pub fn build_lcm_packet(channel: &str, payload: &[u8], seq: u32) -> Vec { - let channel_bytes = channel.as_bytes(); - let total = 4 + 4 + channel_bytes.len() + 1 + payload.len(); - let mut pkt = Vec::with_capacity(total); - - pkt.extend_from_slice(&LCM_MAGIC_SHORT.to_be_bytes()); - pkt.extend_from_slice(&seq.to_be_bytes()); - pkt.extend_from_slice(channel_bytes); - pkt.push(0); // null terminator - pkt.extend_from_slice(payload); - - pkt -} - -/// LCM publisher that sends PointStamped and Twist messages via UDP multicast. -pub struct LcmPublisher { - socket: UdpSocket, - seq: AtomicU32, - channel: String, -} - -impl LcmPublisher { - /// Create a new LCM publisher. - /// - /// `channel` is the LCM channel name, e.g. - /// `"/clicked_point#geometry_msgs.PointStamped"` or - /// `"/cmd_vel#geometry_msgs.Twist"`. - pub fn new(channel: String) -> std::io::Result { - let socket = UdpSocket::bind("0.0.0.0:0")?; - // TTL=0 means local machine only; TTL=1 for same subnet - socket.set_multicast_ttl_v4(0)?; - Ok(Self { - socket, - seq: AtomicU32::new(0), - channel, - }) - } - - /// Publish a click event as a PointStamped LCM message. - pub fn publish(&self, event: &ClickEvent) -> std::io::Result { - let payload = encode_point_stamped(event); - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - let packet = build_lcm_packet(&self.channel, &payload, seq); - self.socket.send_to(&packet, LCM_MULTICAST_ADDR) - } - - /// Publish a twist command as a Twist LCM message. - pub fn publish_twist(&self, cmd: &TwistCommand) -> std::io::Result { - let payload = encode_twist(cmd); - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - let packet = build_lcm_packet(&self.channel, &payload, seq); - self.socket.send_to(&packet, LCM_MULTICAST_ADDR) - } -} - -impl std::fmt::Debug for LcmPublisher { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LcmPublisher") - .field("channel", &self.channel) - .field("seq", &self.seq.load(Ordering::Relaxed)) - .finish() - } -} - -/// Create a `ClickEvent` from position, entity path, and a millisecond timestamp. -pub fn click_event_from_ms( - position: [f32; 3], - entity_path: &str, - timestamp_ms: u64, -) -> ClickEvent { - let total_secs = (timestamp_ms / 1000) as i32; - let nanos = ((timestamp_ms % 1000) * 1_000_000) as i32; - ClickEvent { - x: position[0] as f64, - y: position[1] as f64, - z: position[2] as f64, - entity_path: entity_path.to_string(), - timestamp_sec: total_secs, - timestamp_nsec: nanos, - } -} - -/// Create a `ClickEvent` from position and entity path, using the current time. -pub fn click_event_now(position: [f32; 3], entity_path: &str) -> ClickEvent { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default(); - ClickEvent { - x: position[0] as f64, - y: position[1] as f64, - z: position[2] as f64, - entity_path: entity_path.to_string(), - timestamp_sec: now.as_secs() as i32, - timestamp_nsec: now.subsec_nanos() as i32, - } -} - -/// Create a `TwistCommand` from velocity values. -pub fn twist_command( - linear: [f64; 3], - angular: [f64; 3], -) -> TwistCommand { - TwistCommand { - linear_x: linear[0], - linear_y: linear[1], - linear_z: linear[2], - angular_x: angular[0], - angular_y: angular[1], - angular_z: angular[2], - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_point_stamped_fingerprint() { - fn rot(h: u64) -> u64 { - (h.wrapping_shl(1)).wrapping_add((h >> 63) & 1) - } - let time_hash = rot(0xde1d24a3a8ecb648); - let header_hash = rot(0xdbb33f5b4c19b8ea_u64.wrapping_add(time_hash)); - let point_hash = rot(0x573f2fdd2f76508f); - let ps_hash = - rot(0xf012413a2c8028c2_u64 - .wrapping_add(header_hash) - .wrapping_add(point_hash)); - assert_eq!(ps_hash, POINT_STAMPED_HASH); - } - - #[test] - fn test_twist_fingerprint() { - fn rot(h: u64) -> u64 { - (h.wrapping_shl(1)).wrapping_add((h >> 63) & 1) - } - let vector3_hash = rot(0x573f2fdd2f76508f); - let twist_hash = rot(0x3a4144772922add7_u64 - .wrapping_add(vector3_hash) - .wrapping_add(vector3_hash)); - assert_eq!(twist_hash, TWIST_HASH); - } - - #[test] - fn test_encode_twist_matches_python() { - // Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.3)) - let cmd = TwistCommand { - linear_x: 0.5, - linear_y: 0.0, - linear_z: 0.0, - angular_x: 0.0, - angular_y: 0.0, - angular_z: 0.3, - }; - - let encoded = encode_twist(&cmd); - - let expected_hex = "2e7c07d7cdf7e0273fe000000000000000000000000000000000000000000000000000000000000000000000000000003fd3333333333333"; - let expected: Vec = (0..expected_hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&expected_hex[i..i + 2], 16).unwrap()) - .collect(); - - assert_eq!(encoded, expected, "Encoded bytes must match Python LCM output"); - assert_eq!(encoded.len(), 56, "Encoded length must be 56 bytes"); - } - - #[test] - fn test_encode_twist_zero() { - let cmd = TwistCommand { - linear_x: 0.0, - linear_y: 0.0, - linear_z: 0.0, - angular_x: 0.0, - angular_y: 0.0, - angular_z: 0.0, - }; - let encoded = encode_twist(&cmd); - assert_eq!(encoded.len(), 56); - let hash = u64::from_be_bytes(encoded[0..8].try_into().unwrap()); - assert_eq!(hash, TWIST_HASH); - } - - #[test] - fn test_encode_point_stamped_matches_python() { - let event = ClickEvent { - x: 1.5, - y: 2.5, - z: 3.5, - entity_path: "/world/grid".to_string(), - timestamp_sec: 1234, - timestamp_nsec: 5678, - }; - - let encoded = encode_point_stamped(&event); - - let expected_hex = "9cd764738ea629af00000000000004d20000162e0000000c2f776f726c642f67726964003ff80000000000004004000000000000400c000000000000"; - let expected: Vec = (0..expected_hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&expected_hex[i..i + 2], 16).unwrap()) - .collect(); - - assert_eq!(encoded, expected, "Encoded bytes must match Python LCM output"); - } - - #[test] - fn test_encode_empty_frame_id() { - let event = ClickEvent { - x: 0.0, - y: 0.0, - z: 0.0, - entity_path: String::new(), - timestamp_sec: 0, - timestamp_nsec: 0, - }; - let encoded = encode_point_stamped(&event); - assert_eq!(encoded.len(), 49); - let str_len = i32::from_be_bytes([encoded[20], encoded[21], encoded[22], encoded[23]]); - assert_eq!(str_len, 1); - } - - #[test] - fn test_build_lcm_packet_format() { - let payload = vec![0xAA, 0xBB]; - let channel = "/test"; - let packet = build_lcm_packet(channel, &payload, 42); - assert_eq!(&packet[0..4], &LCM_MAGIC_SHORT.to_be_bytes()); - assert_eq!(&packet[4..8], &42u32.to_be_bytes()); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let channel_bytes = &packet[8..null_pos]; - assert_eq!(channel_bytes, b"/test"); - assert_eq!(&packet[null_pos + 1..], &[0xAA, 0xBB]); - } - - #[test] - fn test_build_lcm_packet_with_typed_channel() { - let payload = vec![0x01]; - let channel = "/clicked_point#geometry_msgs.PointStamped"; - let packet = build_lcm_packet(channel, &payload, 0); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let extracted_channel = std::str::from_utf8(&packet[8..null_pos]).unwrap(); - assert_eq!(extracted_channel, channel); - } - - #[test] - fn test_click_event_from_ms() { - let event = click_event_from_ms([1.0, 2.0, 3.0], "/world", 1234567); - assert_eq!(event.timestamp_sec, 1234); - assert_eq!(event.timestamp_nsec, 567_000_000); - assert_eq!(event.x, 1.0f64); - assert_eq!(event.entity_path, "/world"); - } - - #[test] - fn test_click_event_now() { - let event = click_event_now([0.0, 0.0, 0.0], "/test"); - let now_sec = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i32; - assert!((event.timestamp_sec - now_sec).abs() < 10); - } - - #[test] - fn test_twist_command() { - let cmd = twist_command([1.0, 0.0, 0.0], [0.0, 0.0, 0.5]); - assert_eq!(cmd.linear_x, 1.0); - assert_eq!(cmd.angular_z, 0.5); - } - - #[test] - fn test_lcm_publisher_creation() { - let publisher = LcmPublisher::new("/clicked_point#geometry_msgs.PointStamped".to_string()); - assert!(publisher.is_ok()); - let publisher_twist = LcmPublisher::new("/cmd_vel#geometry_msgs.Twist".to_string()); - assert!(publisher_twist.is_ok()); - } - - #[test] - fn test_full_packet_structure() { - let event = ClickEvent { - x: 1.0, - y: 2.0, - z: 3.0, - entity_path: "/world/robot".to_string(), - timestamp_sec: 100, - timestamp_nsec: 200, - }; - let payload = encode_point_stamped(&event); - let channel = "/clicked_point#geometry_msgs.PointStamped"; - let packet = build_lcm_packet(channel, &payload, 7); - let magic = u32::from_be_bytes([packet[0], packet[1], packet[2], packet[3]]); - assert_eq!(magic, LCM_MAGIC_SHORT); - let seqno = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]); - assert_eq!(seqno, 7); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let ch = std::str::from_utf8(&packet[8..null_pos]).unwrap(); - assert_eq!(ch, channel); - let data_start = null_pos + 1; - let hash_bytes: [u8; 8] = packet[data_start..data_start + 8].try_into().unwrap(); - let hash = u64::from_be_bytes(hash_bytes); - assert_eq!(hash, POINT_STAMPED_HASH); - } - - #[test] - fn test_sequence_number_increments() { - let publisher = - LcmPublisher::new("/test#geometry_msgs.PointStamped".to_string()).unwrap(); - assert_eq!(publisher.seq.load(Ordering::Relaxed), 0); - let seq1 = publisher.seq.fetch_add(1, Ordering::Relaxed); - assert_eq!(seq1, 0); - let seq2 = publisher.seq.fetch_add(1, Ordering::Relaxed); - assert_eq!(seq2, 1); - } -} diff --git a/dimos/src/interaction/mod.rs b/dimos/src/interaction/mod.rs index b6db0f5823e0..7dfebdbb08c8 100644 --- a/dimos/src/interaction/mod.rs +++ b/dimos/src/interaction/mod.rs @@ -1,11 +1,9 @@ pub mod handle; pub mod keyboard; -pub mod lcm; pub mod protocol; pub mod ws; pub use handle::InteractionHandle; pub use keyboard::KeyboardHandler; -pub use lcm::{ClickEvent, TwistCommand, LcmPublisher, click_event_from_ms, click_event_now, twist_command}; pub use protocol::ViewerEvent; pub use ws::WsPublisher; diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index afa3f0115969..2aaff4075dac 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -3,16 +3,13 @@ use std::rc::Rc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use clap::Parser; -use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, WsPublisher, click_event_from_ms}; +use dimos_viewer::interaction::{KeyboardHandler, WsPublisher}; use rerun::external::{eframe, egui, re_crash_handler, re_grpc_client, re_grpc_server, re_log, re_memory, re_uri, re_viewer}; #[global_allocator] static GLOBAL: re_memory::AccountingAllocator = re_memory::AccountingAllocator::new(mimalloc::MiMalloc); -/// LCM channel for click events (follows RViz convention) -const LCM_CHANNEL: &str = "/clicked_point#geometry_msgs.PointStamped"; - /// Minimum time between click events (debouncing) const CLICK_DEBOUNCE_MS: u64 = 100; @@ -25,7 +22,7 @@ const DEFAULT_PORT: u16 = 9877; /// Default WebSocket server URL to connect to in --connect mode const DEFAULT_WS_URL: &str = "ws://127.0.0.1:3030/ws"; -/// DimOS Interactive Viewer — a custom Rerun viewer with LCM click-to-navigate. +/// DimOS Interactive Viewer — a custom Rerun viewer with click-to-navigate. /// /// Accepts the same CLI flags as the stock `rerun` binary so it can be spawned /// seamlessly via `rerun_bindings.spawn(executable_name="dimos-viewer")`. @@ -71,10 +68,8 @@ struct Args { /// events (click, twist, stop). The server is typically the Python /// `RerunWebSocketServer` DimOS module. /// - /// When provided explicitly this flag activates WebSocket event publishing - /// in ALL modes (local gRPC server or --connect). When omitted, the - /// behaviour depends on the mode: --connect defaults to this URL; local - /// mode defaults to LCM multicast. + /// When provided explicitly this flag overrides the default URL. + /// When omitted, defaults to ws://127.0.0.1:3030/ws. #[arg(long)] ws_url: Option, } @@ -161,30 +156,13 @@ async fn main() -> Result<(), Box> { ) }; - // Resolve the effective WebSocket URL: - // - explicit --ws-url → always use WS (local or connect mode) - // - --connect (no --ws-url) → default WS URL - // - neither → LCM multicast - let effective_ws_url: Option = match (&args.ws_url, &args.connect) { - (Some(url), _) => Some(url.clone()), // explicit --ws-url - (None, Some(_)) => Some(DEFAULT_WS_URL.to_string()), // --connect default - (None, None) => None, // local LCM mode - }; - - let (lcm_publisher, ws_publisher, keyboard_handler) = if let Some(ws_url) = effective_ws_url { - let ws = WsPublisher::connect(ws_url.clone()); - re_log::info!("WebSocket client connecting to {ws_url}"); - let kb = KeyboardHandler::new_ws(ws.clone()) - .expect("Failed to create keyboard handler"); - (None::, Some(ws), kb) - } else { - let lcm = LcmPublisher::new(LCM_CHANNEL.to_string()) - .expect("Failed to create LCM publisher"); - re_log::info!("LCM publisher created for channel: {LCM_CHANNEL}"); - let kb = KeyboardHandler::new() - .expect("Failed to create keyboard handler"); - (Some(lcm), None::, kb) - }; + // Resolve the WebSocket URL (always WebSocket, no LCM): + // - explicit --ws-url → use that URL + // - otherwise → default URL + let ws_url = args.ws_url.unwrap_or_else(|| DEFAULT_WS_URL.to_string()); + let ws_publisher = WsPublisher::connect(ws_url.clone()); + re_log::info!("WebSocket client connecting to {ws_url}"); + let keyboard_handler = KeyboardHandler::new(ws_publisher.clone()); // State for debouncing and rapid click detection @@ -248,44 +226,20 @@ async fn main() -> Result<(), Box> { .unwrap_or_default() .as_millis() as u64; - if let Some(ws) = &ws_publisher { - // Connect mode: publish click over WebSocket - ws.send_click( - pos.x as f64, - pos.y as f64, - pos.z as f64, - &entity_path.to_string(), - timestamp_ms, - ); - re_log::debug!( - "WS click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", - entity_path, - pos.x, - pos.y, - pos.z - ); - } else if let Some(lcm) = &lcm_publisher { - // Local mode: publish click via LCM - let click = click_event_from_ms( - [pos.x, pos.y, pos.z], - &entity_path.to_string(), - timestamp_ms, - ); - match lcm.publish(&click) { - Ok(_) => { - re_log::debug!( - "LCM click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", - entity_path, - pos.x, - pos.y, - pos.z - ); - } - Err(err) => { - re_log::error!("Failed to publish LCM click event: {err}"); - } - } - } + ws_publisher.send_click( + pos.x as f64, + pos.y as f64, + pos.z as f64, + &entity_path.to_string(), + timestamp_ms, + ); + re_log::debug!( + "Click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, + pos.x, + pos.y, + pos.z + ); } re_viewer::SelectionChangeItem::Entity { position: None, .. } => { no_position_count += 1; diff --git a/dimos/uv.lock b/dimos/uv.lock new file mode 100644 index 000000000000..f135a581b73b --- /dev/null +++ b/dimos/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 3 +requires-python = ">=3.10" + +[[package]] +name = "dimos-viewer" +version = "0.30.0a4" +source = { editable = "." } diff --git a/uv.lock b/uv.lock index 4acb800535c1..8091c36a1a97 100644 --- a/uv.lock +++ b/uv.lock @@ -4727,7 +4727,7 @@ provides-extras = ["all", "datafusion", "dataplatform", "notebook", "tests"] [[package]] name = "rerun-workspace" -version = "0.28.0a1+dev" +version = "0.30.0a4" source = { virtual = "." } [package.dev-dependencies]