diff --git a/Cargo.lock b/Cargo.lock index 0748530c4368..5bf6916d1806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2330,6 +2330,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "data-url" version = "0.3.1" @@ -3110,15 +3116,17 @@ dependencies = [ [[package]] name = "dimos-viewer" -version = "0.30.0-alpha.1+dev" +version = "0.30.0-alpha.4" dependencies = [ "bincode", "clap", + "futures-util", "mimalloc", - "parking_lot", "rerun", "serde", + "serde_json", "tokio", + "tokio-tungstenite", ] [[package]] @@ -12596,6 +12604,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -13013,6 +13033,23 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78122066b0cb818b8afd08f7ed22f7fdbc3e90815035726f0840d0d26c0747a" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror 2.0.17", + "utf-8", +] + [[package]] name = "twox-hash" version = "2.1.2" @@ -13192,6 +13229,12 @@ dependencies = [ "xmlwriter", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8-ranges" version = "1.0.5" diff --git a/WS_EVENTS.md b/WS_EVENTS.md new file mode 100644 index 000000000000..ee0c5e7f5edc --- /dev/null +++ b/WS_EVENTS.md @@ -0,0 +1,151 @@ +# dimos-viewer WebSocket Event Stream + +When `dimos-viewer` is started with `--connect`, LCM multicast is not available +(LCM uses UDP multicast which is limited to the local machine or subnet). Instead, +the viewer starts a WebSocket server that broadcasts click and keyboard events as +JSON to any connected client. + +## Starting the server + +```sh +dimos-viewer --connect [] [--ws-port ] +``` + +| Flag | Default | Description | +|------|---------|-------------| +| `--connect [url]` | — | Enable connect mode. Optionally specify the gRPC proxy URL (defaults to `rerun+http://127.0.0.1:9877/proxy`). | +| `--ws-port ` | `3030` | Port for the WebSocket event server. | + +The WebSocket server listens on: + +``` +ws://0.0.0.0:/ws +``` + +Multiple clients can connect simultaneously. All connected clients receive every +message. The server does not accept any inbound messages from clients. + +## Message format + +All messages are UTF-8 JSON objects with a `"type"` string discriminant. + +### `heartbeat` + +Sent once per second regardless of viewer activity. Useful for connection health +checks and detecting viewer restarts. + +```json +{ + "type": "heartbeat", + "timestamp_ms": 1773869091428 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `timestamp_ms` | `u64` | Unix timestamp in milliseconds (from the viewer's system clock). | + +### `click` + +Sent when the user clicks a 3D point in the Rerun viewport. Corresponds to the +`/clicked_point` convention from RViz / `geometry_msgs/PointStamped`. + +```json +{ + "type": "click", + "x": 1.234, + "y": -0.567, + "z": 0.000, + "entity_path": "/world/ground_plane", + "timestamp_ms": 1773869091428 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `x` | `f64` | World-space X coordinate (metres). | +| `y` | `f64` | World-space Y coordinate (metres). | +| `z` | `f64` | World-space Z coordinate (metres). | +| `entity_path` | `string` | Rerun entity path of the clicked object. | +| `timestamp_ms` | `u64` | Unix timestamp in milliseconds at the moment of the click. | + +Click events are debounced: a minimum of 100 ms must elapse between successive +published clicks. Rapid clicks within that window are silently dropped (with a +warning logged after 5 consecutive rapid clicks). + +### `twist` + +Sent every frame (~60 Hz) while movement keys are held on the keyboard teleop +overlay. Corresponds to `geometry_msgs/Twist` / `/cmd_vel` convention. + +The keyboard overlay must first be **engaged** by clicking on it (green border = +active). Clicking anywhere outside the overlay disengages it and sends a `stop`. + +```json +{ + "type": "twist", + "linear_x": 0.5, + "linear_y": 0.0, + "linear_z": 0.0, + "angular_x": 0.0, + "angular_y": 0.0, + "angular_z": 0.8 +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `linear_x` | `f64` | Forward (+) / backward (−) velocity in m/s. | +| `linear_y` | `f64` | Strafe left (+) / right (−) velocity in m/s. | +| `linear_z` | `f64` | Vertical velocity in m/s (always 0 for ground robots). | +| `angular_x` | `f64` | Roll rate in rad/s (always 0). | +| `angular_y` | `f64` | Pitch rate in rad/s (always 0). | +| `angular_z` | `f64` | Yaw left (+) / right (−) rate in rad/s. | + +**Key bindings:** + +| Key | Effect | +|-----|--------| +| `W` / `↑` | Forward (`linear_x = +0.5`) | +| `S` / `↓` | Backward (`linear_x = −0.5`) | +| `A` / `←` | Turn left (`angular_z = +0.8`) | +| `D` / `→` | Turn right (`angular_z = −0.8`) | +| `Q` | Strafe left (`linear_y = +0.5`) | +| `E` | Strafe right (`linear_y = −0.5`) | +| `Shift` | Speed multiplier ×2 | + +Multiple keys can be held simultaneously; their effects are summed. + +### `stop` + +Sent when all movement keys are released, the overlay is disengaged, or `Space` +is pressed (emergency stop). Signals the robot to halt immediately. + +```json +{ + "type": "stop" +} +``` + +No additional fields. Semantically equivalent to a `twist` with all fields zero. + +## Example client (Deno) + +A reference test client is provided at `dimos/test_ws_client.ts`: + +```sh +deno run --allow-net dimos/test_ws_client.ts +# or with a custom address: +deno run --allow-net dimos/test_ws_client.ts ws://192.168.1.10:3030/ws +``` + +## Local mode (no `--connect`) + +Without `--connect`, the viewer uses LCM UDP multicast instead of WebSocket: + +| Channel | Message type | +|---------|-------------| +| `/clicked_point#geometry_msgs.PointStamped` | Click events | +| `/cmd_vel#geometry_msgs.Twist` | Twist / stop commands | + +The WebSocket server is **not** started in this mode. diff --git a/dimos/Cargo.toml b/dimos/Cargo.toml index dde4b393b475..c2150ed5265b 100644 --- a/dimos/Cargo.toml +++ b/dimos/Cargo.toml @@ -28,9 +28,10 @@ rerun = { path = "../crates/top/rerun", default-features = false, features = [ clap = { workspace = true, features = ["derive"] } bincode.workspace = true +futures-util.workspace = true mimalloc.workspace = true -parking_lot.workspace = true serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true tokio = { workspace = true, features = [ "io-util", "macros", @@ -40,3 +41,7 @@ tokio = { workspace = true, features = [ "sync", "time", ] } +tokio-tungstenite = "0.28.0" + +[lints] +workspace = true diff --git a/dimos/pyproject.toml b/dimos/pyproject.toml index 83159e2bec27..1b1376b75811 100644 --- a/dimos/pyproject.toml +++ b/dimos/pyproject.toml @@ -8,18 +8,18 @@ version = "0.30.0a4" description = "Interactive Rerun viewer for DimOS with click-to-navigate support" readme = "README.md" requires-python = ">=3.10" -license = {text = "MIT OR Apache-2.0"} -authors = [{name = "Dimensional Inc.", email = "engineering@dimensional.com"}] +license = { text = "MIT OR Apache-2.0" } +authors = [{ name = "Dimensional Inc.", email = "engineering@dimensional.com" }] classifiers = [ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "Topic :: Scientific/Engineering", - "Topic :: Scientific/Engineering :: Visualization", - "Programming Language :: Rust", - "License :: OSI Approved :: MIT License", - "License :: OSI Approved :: Apache Software License", - "Operating System :: POSIX :: Linux", - "Operating System :: MacOS", + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Visualization", + "Programming Language :: Rust", + "License :: OSI Approved :: MIT License", + "License :: OSI Approved :: Apache Software License", + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS", ] [project.urls] diff --git a/dimos/src/interaction/handle.rs b/dimos/src/interaction/handle.rs index 0f71a6f11fd6..2378724bcd4c 100644 --- a/dimos/src/interaction/handle.rs +++ b/dimos/src/interaction/handle.rs @@ -34,8 +34,8 @@ impl InteractionHandle { is_2d, }; - if let Err(e) = self.tx.send(event) { - eprintln!("Failed to send click event: {}", e); + if let Err(err) = self.tx.send(event) { + eprintln!("Failed to send click event: {}", err); } } } diff --git a/dimos/src/interaction/keyboard.rs b/dimos/src/interaction/keyboard.rs index b6cdcd809c37..cc3fda6675d9 100644 --- a/dimos/src/interaction/keyboard.rs +++ b/dimos/src/interaction/keyboard.rs @@ -1,25 +1,20 @@ //! Keyboard handler for WASD movement controls that publish Twist messages. -//! +//! //! Converts keyboard input to robot velocity commands following teleop conventions: //! - WASD/arrows for linear/angular motion //! - QE for strafing //! - Space for emergency stop //! - Shift for speed multiplier -use std::io; -use super::lcm::{LcmPublisher, twist_command}; +use super::ws::WsPublisher; use rerun::external::{egui, re_log}; -/// LCM channel for twist commands (matches DimOS convention) -const CMD_VEL_CHANNEL: &str = "/cmd_vel#geometry_msgs.Twist"; - /// Base speeds for keyboard control const BASE_LINEAR_SPEED: f64 = 0.5; // m/s const BASE_ANGULAR_SPEED: f64 = 0.8; // rad/s const FAST_MULTIPLIER: f64 = 2.0; // Shift modifier /// Overlay styling -const OVERLAY_MARGIN: f32 = 12.0; const OVERLAY_PADDING: f32 = 10.0; const OVERLAY_ROUNDING: f32 = 8.0; const OVERLAY_BG: egui::Color32 = egui::Color32::from_rgba_premultiplied(20, 20, 30, 220); @@ -65,54 +60,52 @@ impl KeyState { } } -/// Handles keyboard input and publishes Twist via LCM. +/// Handles keyboard input and publishes Twist via WebSocket. +/// Must be activated by clicking the overlay before keys are captured. pub struct KeyboardHandler { - publisher: LcmPublisher, + ws: WsPublisher, state: KeyState, was_active: bool, estop_flash: bool, // true briefly after space pressed + engaged: bool, // true when user has clicked the overlay to activate } impl KeyboardHandler { - /// Create a new keyboard handler with LCM publisher on CMD_VEL_CHANNEL. - pub fn new() -> Result { - let publisher = LcmPublisher::new(CMD_VEL_CHANNEL.to_string())?; - Ok(Self { - publisher, + /// Create a new keyboard handler that publishes twist commands via WebSocket. + pub fn new(ws: WsPublisher) -> Self { + Self { + ws, state: KeyState::new(), was_active: false, estop_flash: false, - }) + engaged: false, + } } /// Process keyboard input from egui and publish Twist if keys are held. /// Called once per frame from DimosApp.ui(). + /// Only captures keys when the overlay has been clicked (engaged). /// /// Returns true if any movement key is active (for UI overlay). pub fn process(&mut self, ctx: &egui::Context) -> bool { self.estop_flash = false; - // Check if any text widget has focus - if so, skip keyboard capture - let text_has_focus = ctx.memory(|m| m.focused().is_some()); - if text_has_focus { + // If not engaged, don't capture any keys + if !self.engaged { if self.was_active { - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop command on focus change: {e:?}"); - } + self.publish_stop(); self.was_active = false; } return false; } - // Update key state from egui input + // Update key state from egui input (engaged flag is the only gate) self.update_key_state(ctx); // Check for emergency stop (Space key pressed - one-shot action) if ctx.input(|i| i.key_pressed(egui::Key::Space)) { self.state.reset(); - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send emergency stop: {e:?}"); - } + self.publish_stop(); self.was_active = false; self.estop_flash = true; return true; // return true so overlay shows the e-stop flash @@ -120,47 +113,91 @@ impl KeyboardHandler { // Publish twist command if keys are active, or stop if just released if self.state.any_active() { - if let Err(e) = self.publish_twist() { - re_log::warn!("Failed to publish twist command: {e:?}"); - } + self.publish_twist(); self.was_active = true; } else if self.was_active { - if let Err(e) = self.publish_stop() { - re_log::warn!("Failed to send stop on key release: {e:?}"); - } + self.publish_stop(); self.was_active = false; } self.state.any_active() } - /// Draw keyboard overlay HUD. Always shown (dim when idle, bright when active). - pub fn draw_overlay(&self, ctx: &egui::Context) { - egui::Area::new("keyboard_hud".into()) - .fixed_pos(egui::pos2(OVERLAY_MARGIN, OVERLAY_MARGIN)) + /// Draw keyboard overlay HUD at bottom-right of the 3D viewport area. + /// Clickable: clicking the overlay toggles engaged state. + pub fn draw_overlay(&mut self, ctx: &egui::Context) { + let screen_rect = ctx.content_rect(); + // Default position: bottom-left, just above the timeline bar + let overlay_height = 160.0; + let left_margin = 12.0; + let bottom_timeline_offset = 120.0; + let default_pos = egui::pos2( + screen_rect.min.x + left_margin, + screen_rect.max.y - overlay_height - bottom_timeline_offset, + ); + + let area_response = egui::Area::new("keyboard_hud".into()) + .pivot(egui::Align2::LEFT_BOTTOM) + .default_pos(default_pos) + .movable(true) .order(egui::Order::Foreground) - .interactable(false) + .interactable(true) .show(ctx, |ui| { - egui::Frame::new() + let border_color = if self.engaged { + egui::Color32::from_rgb(60, 180, 75) // green border when active + } else { + egui::Color32::from_rgb(80, 80, 100) // dim border when inactive + }; + + let response = egui::Frame::new() .fill(OVERLAY_BG) .corner_radius(egui::CornerRadius::same(OVERLAY_ROUNDING as u8)) .inner_margin(egui::Margin::same(OVERLAY_PADDING as i8)) + .stroke(egui::Stroke::new(2.0, border_color)) .show(ui, |ui| { self.draw_hud_content(ui); }); - }); + + // Make the frame rect clickable (Frame doesn't have click sense by default) + let click_response = ui.interact( + response.response.rect, + ui.id().with("wasd_click"), + egui::Sense::click(), + ); + + // Force arrow cursor over the entire overlay (overrides label I-beam) + if click_response.hovered() { + ctx.set_cursor_icon(egui::CursorIcon::Default); + } + + // Toggle engaged state on click + if click_response.clicked() { + self.engaged = !self.engaged; + if !self.engaged { + // Send stop when disengaging + self.publish_stop(); + self.state.reset(); + self.was_active = false; + } + } + }) + .response; + + // Disengage when clicking anywhere outside the overlay + if self.engaged + && !ctx.rect_contains_pointer(area_response.layer_id, area_response.interact_rect) + && ctx.input(|i| i.pointer.primary_clicked()) + { + self.engaged = false; + self.publish_stop(); + self.state.reset(); + self.was_active = false; + } } fn draw_hud_content(&self, ui: &mut egui::Ui) { - let active = self.state.any_active() || self.estop_flash; - // Title - let title_color = if active { - egui::Color32::WHITE - } else { - egui::Color32::from_rgb(120, 120, 140) - }; - ui.label(egui::RichText::new("🎮 Keyboard Teleop").color(title_color).size(13.0)); + ui.label(egui::RichText::new("Keyboard Teleop").color(LABEL_COLOR).size(13.0)); ui.add_space(4.0); // Key grid: [Q] [W] [E] @@ -254,31 +291,21 @@ impl KeyboardHandler { }); } - /// Convert current KeyState to Twist and publish via LCM. - fn publish_twist(&mut self) -> io::Result<()> { + /// Convert current KeyState to Twist and publish via WebSocket. + fn publish_twist(&mut self) { let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = self.compute_twist(); + self.ws.send_twist(lin_x, lin_y, lin_z, ang_x, ang_y, ang_z); - let cmd = twist_command( - [lin_x, lin_y, lin_z], - [ang_x, ang_y, ang_z], - ); - - self.publisher.publish_twist(&cmd)?; - - re_log::trace!( + re_log::info!( "Published twist: lin=({:.2},{:.2},{:.2}) ang=({:.2},{:.2},{:.2})", lin_x, lin_y, lin_z, ang_x, ang_y, ang_z ); - - Ok(()) } - /// Publish all-zero twist (stop command) - fn publish_stop(&mut self) -> io::Result<()> { - let cmd = twist_command([0.0, 0.0, 0.0], [0.0, 0.0, 0.0]); - self.publisher.publish_twist(&cmd)?; - re_log::debug!("Published stop command"); - Ok(()) + /// Publish all-zero twist (stop command) via WebSocket. + fn publish_stop(&mut self) { + self.ws.send_stop(); + re_log::info!("Published stop command"); } /// Map KeyState to linear/angular velocities. @@ -328,6 +355,23 @@ impl std::fmt::Debug for KeyboardHandler { mod tests { use super::*; + /// Create a dummy WsPublisher for tests (connects to a non-existent server, + /// which is fine — we only test compute_twist, never actually send). + fn test_ws() -> WsPublisher { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { WsPublisher::connect("ws://127.0.0.1:1/test".to_string()) }) + } + + fn handler_with(state: KeyState) -> KeyboardHandler { + KeyboardHandler { + ws: test_ws(), + state, + was_active: false, + estop_flash: false, + engaged: true, + } + } + #[test] fn test_key_state_any_active() { let mut state = KeyState::new(); @@ -347,12 +391,7 @@ mod tests { fn test_wasd_to_twist_mapping() { let mut state = KeyState::new(); state.forward = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); assert_eq!(lin_y, 0.0); @@ -363,12 +402,7 @@ mod tests { fn test_turn_left_right_mapping() { let mut state = KeyState::new(); state.left = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -376,12 +410,7 @@ mod tests { let mut state = KeyState::new(); state.right = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -392,12 +421,7 @@ mod tests { fn test_strafe_mapping() { let mut state = KeyState::new(); state.strafe_l = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, BASE_LINEAR_SPEED); @@ -405,12 +429,7 @@ mod tests { let mut state = KeyState::new(); state.strafe_r = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, -BASE_LINEAR_SPEED); @@ -422,12 +441,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.fast = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED * FAST_MULTIPLIER); assert_eq!(lin_y, 0.0); @@ -439,12 +453,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.left = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, BASE_LINEAR_SPEED); assert_eq!(lin_y, 0.0); @@ -467,10 +476,9 @@ mod tests { #[test] fn test_keyboard_handler_creation() { - let handler = KeyboardHandler::new(); - assert!(handler.is_ok()); - let handler = handler.unwrap(); + let handler = KeyboardHandler::new(test_ws()); assert!(!handler.was_active); + assert!(!handler.engaged); assert!(!handler.state.any_active()); } @@ -479,12 +487,7 @@ mod tests { let mut state = KeyState::new(); state.forward = true; state.backward = true; - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state, - was_active: false, - estop_flash: false, - }; + let handler = handler_with(state); let (lin_x, lin_y, _, _, _, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); @@ -493,12 +496,7 @@ mod tests { #[test] fn test_compute_twist_all_zeros() { - let handler = KeyboardHandler { - publisher: LcmPublisher::new("/test".to_string()).unwrap(), - state: KeyState::new(), - was_active: false, - estop_flash: false, - }; + let handler = handler_with(KeyState::new()); let (lin_x, lin_y, lin_z, ang_x, ang_y, ang_z) = handler.compute_twist(); assert_eq!(lin_x, 0.0); assert_eq!(lin_y, 0.0); diff --git a/dimos/src/interaction/lcm.rs b/dimos/src/interaction/lcm.rs deleted file mode 100644 index 6b6b4146a8f5..000000000000 --- a/dimos/src/interaction/lcm.rs +++ /dev/null @@ -1,464 +0,0 @@ -//! LCM (Lightweight Communications and Marshalling) publisher for click events and twist commands. -//! -//! Publishes `geometry_msgs/PointStamped` and `geometry_msgs/Twist` messages over UDP multicast, -//! following the same convention as RViz's `/clicked_point` and `/cmd_vel` topics. -//! -//! ## LCM Wire Protocol (short message) -//! ```text -//! [4B magic "LC02"] [4B seqno] [channel\0] [LCM-encoded payload] -//! ``` -//! -//! ## PointStamped Binary Layout -//! ```text -//! [8B fingerprint hash] [Header (no hash)] [Point (no hash)] -//! -//! Header: -//! [4B seq: i32] [4B stamp.sec: i32] [4B stamp.nsec: i32] -//! [4B frame_id_len: i32 (including null)] [frame_id bytes] [null] -//! -//! Point: -//! [8B x: f64] [8B y: f64] [8B z: f64] -//! ``` -//! -//! ## Twist Binary Layout -//! ```text -//! [8B fingerprint hash] [Twist (no hash)] -//! -//! Twist: -//! Vector3 linear: [8B x: f64] [8B y: f64] [8B z: f64] -//! Vector3 angular: [8B x: f64] [8B y: f64] [8B z: f64] -//! ``` - -use std::net::UdpSocket; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::SystemTime; - -/// LCM multicast address and port (default LCM configuration). -const LCM_MULTICAST_ADDR: &str = "239.255.76.67:7667"; - -/// LCM short message magic number: "LC02" in ASCII. -const LCM_MAGIC_SHORT: u32 = 0x4c433032; - -/// Pre-computed fingerprint hash for `geometry_msgs/PointStamped`. -/// -/// Computed from the recursive hash chain: -/// - Time: base=0xde1d24a3a8ecb648 -> rot -> 0xbc3a494751d96c91 -/// - Header: base=0xdbb33f5b4c19b8ea + Time -> rot -> 0x2fdb11453be64af7 -/// - Point: base=0x573f2fdd2f76508f -> rot -> 0xae7e5fba5eeca11e -/// - PointStamped: base=0xf012413a2c8028c2 + Header + Point -> rot -> 0x9cd764738ea629af -const POINT_STAMPED_HASH: u64 = 0x9cd764738ea629af; - -/// Pre-computed fingerprint hash for `geometry_msgs/Twist`. -/// -/// Computed from the recursive hash chain: -/// - Vector3: base=0x573f2fdd2f76508f -> rot -> 0xae7e5fba5eeca11e -/// - Twist: base=0x3a4144772922add7 + Vector3 + Vector3 -> rot -> 0x2e7c07d7cdf7e027 -const TWIST_HASH: u64 = 0x2e7c07d7cdf7e027; - -/// A click event with world-space coordinates and entity info. -#[derive(Debug, Clone)] -pub struct ClickEvent { - pub x: f64, - pub y: f64, - pub z: f64, - /// Rerun entity path (stored in frame_id per our convention). - pub entity_path: String, - /// Unix timestamp in seconds. - pub timestamp_sec: i32, - /// Nanosecond remainder. - pub timestamp_nsec: i32, -} - -/// A velocity command (maps to geometry_msgs/Twist). -#[derive(Debug, Clone)] -pub struct TwistCommand { - pub linear_x: f64, // forward/backward - pub linear_y: f64, // strafe left/right - pub linear_z: f64, // up/down (unused for ground robots) - pub angular_x: f64, // roll (unused) - pub angular_y: f64, // pitch (unused) - pub angular_z: f64, // yaw left/right -} - -/// Encodes a `PointStamped` LCM message (with fingerprint hash prefix). -/// -/// Binary layout: -/// - 8 bytes: fingerprint hash (big-endian i64) -/// - Header (no hash): seq(i32) + stamp.sec(i32) + stamp.nsec(i32) + frame_id(len-prefixed string) -/// - Point (no hash): x(f64) + y(f64) + z(f64) -pub fn encode_point_stamped(event: &ClickEvent) -> Vec { - let frame_id_bytes = event.entity_path.as_bytes(); - // LCM string encoding: i32 length (including null terminator) + bytes + null - let string_len = (frame_id_bytes.len() + 1) as i32; - - // Calculate total size: - // 8 (hash) + 4 (seq) + 4 (sec) + 4 (nsec) + 4 (string_len) + frame_id_bytes + 1 (null) + 24 (3 doubles) - let total_size = 8 + 4 + 4 + 4 + 4 + frame_id_bytes.len() + 1 + 24; - let mut buf = Vec::with_capacity(total_size); - - // Fingerprint hash (big-endian) - buf.extend_from_slice(&POINT_STAMPED_HASH.to_be_bytes()); - - // Header._encodeNoHash: - // seq (i32, big-endian) -- always 0 for click events - buf.extend_from_slice(&0i32.to_be_bytes()); - // stamp.sec (i32) - buf.extend_from_slice(&event.timestamp_sec.to_be_bytes()); - // stamp.nsec (i32) - buf.extend_from_slice(&event.timestamp_nsec.to_be_bytes()); - // frame_id: string = i32 length (incl null) + bytes + null - buf.extend_from_slice(&string_len.to_be_bytes()); - buf.extend_from_slice(frame_id_bytes); - buf.push(0); // null terminator - - // Point._encodeNoHash: - buf.extend_from_slice(&event.x.to_be_bytes()); - buf.extend_from_slice(&event.y.to_be_bytes()); - buf.extend_from_slice(&event.z.to_be_bytes()); - - buf -} - -/// Encodes a `Twist` LCM message (with fingerprint hash prefix). -/// -/// Binary layout: -/// - 8 bytes: fingerprint hash (big-endian u64) -/// - Twist (no hash): linear(Vector3: x,y,z f64) + angular(Vector3: x,y,z f64) -pub fn encode_twist(cmd: &TwistCommand) -> Vec { - // 8 (hash) + 48 (6 doubles) = 56 bytes - let mut buf = Vec::with_capacity(56); - - // Fingerprint hash (big-endian) - buf.extend_from_slice(&TWIST_HASH.to_be_bytes()); - - // Twist._encodeNoHash: - // Vector3 linear: - buf.extend_from_slice(&cmd.linear_x.to_be_bytes()); - buf.extend_from_slice(&cmd.linear_y.to_be_bytes()); - buf.extend_from_slice(&cmd.linear_z.to_be_bytes()); - // Vector3 angular: - buf.extend_from_slice(&cmd.angular_x.to_be_bytes()); - buf.extend_from_slice(&cmd.angular_y.to_be_bytes()); - buf.extend_from_slice(&cmd.angular_z.to_be_bytes()); - - buf -} - -/// Builds a complete LCM UDP packet (short message format). -/// -/// Format: `[4B magic] [4B seqno] [channel\0] [payload]` -pub fn build_lcm_packet(channel: &str, payload: &[u8], seq: u32) -> Vec { - let channel_bytes = channel.as_bytes(); - let total = 4 + 4 + channel_bytes.len() + 1 + payload.len(); - let mut pkt = Vec::with_capacity(total); - - pkt.extend_from_slice(&LCM_MAGIC_SHORT.to_be_bytes()); - pkt.extend_from_slice(&seq.to_be_bytes()); - pkt.extend_from_slice(channel_bytes); - pkt.push(0); // null terminator - pkt.extend_from_slice(payload); - - pkt -} - -/// LCM publisher that sends PointStamped and Twist messages via UDP multicast. -pub struct LcmPublisher { - socket: UdpSocket, - seq: AtomicU32, - channel: String, -} - -impl LcmPublisher { - /// Create a new LCM publisher. - /// - /// `channel` is the LCM channel name, e.g. - /// `"/clicked_point#geometry_msgs.PointStamped"` or - /// `"/cmd_vel#geometry_msgs.Twist"`. - pub fn new(channel: String) -> std::io::Result { - let socket = UdpSocket::bind("0.0.0.0:0")?; - // TTL=0 means local machine only; TTL=1 for same subnet - socket.set_multicast_ttl_v4(0)?; - Ok(Self { - socket, - seq: AtomicU32::new(0), - channel, - }) - } - - /// Publish a click event as a PointStamped LCM message. - pub fn publish(&self, event: &ClickEvent) -> std::io::Result { - let payload = encode_point_stamped(event); - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - let packet = build_lcm_packet(&self.channel, &payload, seq); - self.socket.send_to(&packet, LCM_MULTICAST_ADDR) - } - - /// Publish a twist command as a Twist LCM message. - pub fn publish_twist(&self, cmd: &TwistCommand) -> std::io::Result { - let payload = encode_twist(cmd); - let seq = self.seq.fetch_add(1, Ordering::Relaxed); - let packet = build_lcm_packet(&self.channel, &payload, seq); - self.socket.send_to(&packet, LCM_MULTICAST_ADDR) - } -} - -impl std::fmt::Debug for LcmPublisher { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LcmPublisher") - .field("channel", &self.channel) - .field("seq", &self.seq.load(Ordering::Relaxed)) - .finish() - } -} - -/// Create a `ClickEvent` from position, entity path, and a millisecond timestamp. -pub fn click_event_from_ms( - position: [f32; 3], - entity_path: &str, - timestamp_ms: u64, -) -> ClickEvent { - let total_secs = (timestamp_ms / 1000) as i32; - let nanos = ((timestamp_ms % 1000) * 1_000_000) as i32; - ClickEvent { - x: position[0] as f64, - y: position[1] as f64, - z: position[2] as f64, - entity_path: entity_path.to_string(), - timestamp_sec: total_secs, - timestamp_nsec: nanos, - } -} - -/// Create a `ClickEvent` from position and entity path, using the current time. -pub fn click_event_now(position: [f32; 3], entity_path: &str) -> ClickEvent { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default(); - ClickEvent { - x: position[0] as f64, - y: position[1] as f64, - z: position[2] as f64, - entity_path: entity_path.to_string(), - timestamp_sec: now.as_secs() as i32, - timestamp_nsec: now.subsec_nanos() as i32, - } -} - -/// Create a `TwistCommand` from velocity values. -pub fn twist_command( - linear: [f64; 3], - angular: [f64; 3], -) -> TwistCommand { - TwistCommand { - linear_x: linear[0], - linear_y: linear[1], - linear_z: linear[2], - angular_x: angular[0], - angular_y: angular[1], - angular_z: angular[2], - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_point_stamped_fingerprint() { - fn rot(h: u64) -> u64 { - (h.wrapping_shl(1)).wrapping_add((h >> 63) & 1) - } - let time_hash = rot(0xde1d24a3a8ecb648); - let header_hash = rot(0xdbb33f5b4c19b8ea_u64.wrapping_add(time_hash)); - let point_hash = rot(0x573f2fdd2f76508f); - let ps_hash = - rot(0xf012413a2c8028c2_u64 - .wrapping_add(header_hash) - .wrapping_add(point_hash)); - assert_eq!(ps_hash, POINT_STAMPED_HASH); - } - - #[test] - fn test_twist_fingerprint() { - fn rot(h: u64) -> u64 { - (h.wrapping_shl(1)).wrapping_add((h >> 63) & 1) - } - let vector3_hash = rot(0x573f2fdd2f76508f); - let twist_hash = rot(0x3a4144772922add7_u64 - .wrapping_add(vector3_hash) - .wrapping_add(vector3_hash)); - assert_eq!(twist_hash, TWIST_HASH); - } - - #[test] - fn test_encode_twist_matches_python() { - // Twist(Vector3(0.5, 0.0, 0.0), Vector3(0.0, 0.0, 0.3)) - let cmd = TwistCommand { - linear_x: 0.5, - linear_y: 0.0, - linear_z: 0.0, - angular_x: 0.0, - angular_y: 0.0, - angular_z: 0.3, - }; - - let encoded = encode_twist(&cmd); - - let expected_hex = "2e7c07d7cdf7e0273fe000000000000000000000000000000000000000000000000000000000000000000000000000003fd3333333333333"; - let expected: Vec = (0..expected_hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&expected_hex[i..i + 2], 16).unwrap()) - .collect(); - - assert_eq!(encoded, expected, "Encoded bytes must match Python LCM output"); - assert_eq!(encoded.len(), 56, "Encoded length must be 56 bytes"); - } - - #[test] - fn test_encode_twist_zero() { - let cmd = TwistCommand { - linear_x: 0.0, - linear_y: 0.0, - linear_z: 0.0, - angular_x: 0.0, - angular_y: 0.0, - angular_z: 0.0, - }; - let encoded = encode_twist(&cmd); - assert_eq!(encoded.len(), 56); - let hash = u64::from_be_bytes(encoded[0..8].try_into().unwrap()); - assert_eq!(hash, TWIST_HASH); - } - - #[test] - fn test_encode_point_stamped_matches_python() { - let event = ClickEvent { - x: 1.5, - y: 2.5, - z: 3.5, - entity_path: "/world/grid".to_string(), - timestamp_sec: 1234, - timestamp_nsec: 5678, - }; - - let encoded = encode_point_stamped(&event); - - let expected_hex = "9cd764738ea629af00000000000004d20000162e0000000c2f776f726c642f67726964003ff80000000000004004000000000000400c000000000000"; - let expected: Vec = (0..expected_hex.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&expected_hex[i..i + 2], 16).unwrap()) - .collect(); - - assert_eq!(encoded, expected, "Encoded bytes must match Python LCM output"); - } - - #[test] - fn test_encode_empty_frame_id() { - let event = ClickEvent { - x: 0.0, - y: 0.0, - z: 0.0, - entity_path: String::new(), - timestamp_sec: 0, - timestamp_nsec: 0, - }; - let encoded = encode_point_stamped(&event); - assert_eq!(encoded.len(), 49); - let str_len = i32::from_be_bytes([encoded[20], encoded[21], encoded[22], encoded[23]]); - assert_eq!(str_len, 1); - } - - #[test] - fn test_build_lcm_packet_format() { - let payload = vec![0xAA, 0xBB]; - let channel = "/test"; - let packet = build_lcm_packet(channel, &payload, 42); - assert_eq!(&packet[0..4], &LCM_MAGIC_SHORT.to_be_bytes()); - assert_eq!(&packet[4..8], &42u32.to_be_bytes()); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let channel_bytes = &packet[8..null_pos]; - assert_eq!(channel_bytes, b"/test"); - assert_eq!(&packet[null_pos + 1..], &[0xAA, 0xBB]); - } - - #[test] - fn test_build_lcm_packet_with_typed_channel() { - let payload = vec![0x01]; - let channel = "/clicked_point#geometry_msgs.PointStamped"; - let packet = build_lcm_packet(channel, &payload, 0); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let extracted_channel = std::str::from_utf8(&packet[8..null_pos]).unwrap(); - assert_eq!(extracted_channel, channel); - } - - #[test] - fn test_click_event_from_ms() { - let event = click_event_from_ms([1.0, 2.0, 3.0], "/world", 1234567); - assert_eq!(event.timestamp_sec, 1234); - assert_eq!(event.timestamp_nsec, 567_000_000); - assert_eq!(event.x, 1.0f64); - assert_eq!(event.entity_path, "/world"); - } - - #[test] - fn test_click_event_now() { - let event = click_event_now([0.0, 0.0, 0.0], "/test"); - let now_sec = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i32; - assert!((event.timestamp_sec - now_sec).abs() < 10); - } - - #[test] - fn test_twist_command() { - let cmd = twist_command([1.0, 0.0, 0.0], [0.0, 0.0, 0.5]); - assert_eq!(cmd.linear_x, 1.0); - assert_eq!(cmd.angular_z, 0.5); - } - - #[test] - fn test_lcm_publisher_creation() { - let publisher = LcmPublisher::new("/clicked_point#geometry_msgs.PointStamped".to_string()); - assert!(publisher.is_ok()); - let publisher_twist = LcmPublisher::new("/cmd_vel#geometry_msgs.Twist".to_string()); - assert!(publisher_twist.is_ok()); - } - - #[test] - fn test_full_packet_structure() { - let event = ClickEvent { - x: 1.0, - y: 2.0, - z: 3.0, - entity_path: "/world/robot".to_string(), - timestamp_sec: 100, - timestamp_nsec: 200, - }; - let payload = encode_point_stamped(&event); - let channel = "/clicked_point#geometry_msgs.PointStamped"; - let packet = build_lcm_packet(channel, &payload, 7); - let magic = u32::from_be_bytes([packet[0], packet[1], packet[2], packet[3]]); - assert_eq!(magic, LCM_MAGIC_SHORT); - let seqno = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]); - assert_eq!(seqno, 7); - let null_pos = packet[8..].iter().position(|&b| b == 0).unwrap() + 8; - let ch = std::str::from_utf8(&packet[8..null_pos]).unwrap(); - assert_eq!(ch, channel); - let data_start = null_pos + 1; - let hash_bytes: [u8; 8] = packet[data_start..data_start + 8].try_into().unwrap(); - let hash = u64::from_be_bytes(hash_bytes); - assert_eq!(hash, POINT_STAMPED_HASH); - } - - #[test] - fn test_sequence_number_increments() { - let publisher = - LcmPublisher::new("/test#geometry_msgs.PointStamped".to_string()).unwrap(); - assert_eq!(publisher.seq.load(Ordering::Relaxed), 0); - let seq1 = publisher.seq.fetch_add(1, Ordering::Relaxed); - assert_eq!(seq1, 0); - let seq2 = publisher.seq.fetch_add(1, Ordering::Relaxed); - assert_eq!(seq2, 1); - } -} diff --git a/dimos/src/interaction/mod.rs b/dimos/src/interaction/mod.rs index 6fec89890734..7dfebdbb08c8 100644 --- a/dimos/src/interaction/mod.rs +++ b/dimos/src/interaction/mod.rs @@ -1,9 +1,9 @@ pub mod handle; pub mod keyboard; -pub mod lcm; pub mod protocol; +pub mod ws; pub use handle::InteractionHandle; pub use keyboard::KeyboardHandler; -pub use lcm::{ClickEvent, TwistCommand, LcmPublisher, click_event_from_ms, click_event_now, twist_command}; pub use protocol::ViewerEvent; +pub use ws::WsPublisher; diff --git a/dimos/src/interaction/ws.rs b/dimos/src/interaction/ws.rs new file mode 100644 index 000000000000..faec46d54d27 --- /dev/null +++ b/dimos/src/interaction/ws.rs @@ -0,0 +1,184 @@ +//! WebSocket client for remote (non-LCM) mode. +//! +//! When `dimos-viewer` is started with `--connect`, LCM multicast is unavailable +//! across machines. This module connects to a WebSocket server (typically the +//! Python `RerunWebSocketServer` module) and sends click, twist, and stop events +//! as JSON. +//! +//! Message format (JSON objects with a `"type"` discriminant): +//! +//! ```json +//! {"type":"click","x":1.0,"y":2.0,"z":3.0,"entity_path":"/world","timestamp_ms":1234567890} +//! {"type":"twist","linear_x":0.5,"linear_y":0.0,"linear_z":0.0,"angular_x":0.0,"angular_y":0.0,"angular_z":0.8} +//! {"type":"stop"} +//! ``` + +use std::time::Duration; + +use rerun::external::re_log; +use serde::Serialize; +use tokio::sync::mpsc; + +/// JSON message variants sent over the WebSocket. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsEvent { + Click { + x: f64, + y: f64, + z: f64, + entity_path: String, + timestamp_ms: u64, + }, + Twist { + linear_x: f64, + linear_y: f64, + linear_z: f64, + angular_x: f64, + angular_y: f64, + angular_z: f64, + }, + Stop, +} + +/// Sends `WsEvent`s (serialised to JSON) to a remote WebSocket server. +/// +/// Maintains a persistent connection with automatic reconnection. The +/// internal sender is `Clone`, so you can hand copies to multiple producers +/// (keyboard handler, click handler, …). +#[derive(Clone)] +pub struct WsPublisher { + tx: mpsc::Sender, +} + +impl WsPublisher { + /// Spawn the WebSocket client task and return a publisher. + /// + /// The client connects to `url` (e.g. `ws://127.0.0.1:3030/ws`) and + /// reconnects automatically whenever the connection drops. + pub fn connect(url: String) -> Self { + let (tx, rx) = mpsc::channel::(256); + + tokio::spawn(async move { + run_client(url, rx).await; + }); + + Self { tx } + } + + /// Publish a click event. + pub fn send_click(&self, x: f64, y: f64, z: f64, entity_path: &str, timestamp_ms: u64) { + let event = WsEvent::Click { + x, + y, + z, + entity_path: entity_path.to_string(), + timestamp_ms, + }; + self.broadcast(event); + } + + /// Publish a twist (velocity) command. + pub fn send_twist( + &self, + linear_x: f64, + linear_y: f64, + linear_z: f64, + angular_x: f64, + angular_y: f64, + angular_z: f64, + ) { + let event = WsEvent::Twist { + linear_x, + linear_y, + linear_z, + angular_x, + angular_y, + angular_z, + }; + self.broadcast(event); + } + + /// Publish a stop command. + pub fn send_stop(&self) { + self.broadcast(WsEvent::Stop); + } + + fn broadcast(&self, event: WsEvent) { + if let Ok(json) = serde_json::to_string(&event) { + // Non-blocking: drop message if the channel is full rather than block the UI thread. + if self.tx.try_send(json).is_err() { + re_log::warn!("WsPublisher: send queue full, dropped event"); + } + } + } +} + +/// Background task: connect → send → reconnect loop. +async fn run_client(url: String, mut rx: mpsc::Receiver) { + use futures_util::{SinkExt, StreamExt}; + use tokio_tungstenite::{connect_async, tungstenite::Message}; + + loop { + re_log::info!("WsPublisher: connecting to {url}"); + + match connect_async(&url).await { + Ok((ws_stream, _)) => { + re_log::info!("WsPublisher: connected to {url}"); + + let (mut writer, mut reader) = ws_stream.split(); + + // Read task: consume incoming frames (ping → auto pong) so the + // server's keepalive pings get answered and the connection stays + // alive. Exits when the server closes or an error occurs. + let mut read_handle = tokio::spawn(async move { + while let Some(frame) = reader.next().await { + match frame { + Ok(Message::Close(_)) => break, + Err(err) => { + re_log::debug!("WsPublisher: read error: {err}"); + break; + } + _ => {} // Ping/Pong handled by tungstenite internally + } + } + }); + + // Write loop: drain the channel into the WebSocket. + let disconnected = loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Some(text) => { + if let Err(err) = writer.send(Message::text(text)).await { + re_log::warn!("WsPublisher: send error: {err} — reconnecting"); + break false; + } + } + None => break true, // rx closed → task is done + } + } + _ = &mut read_handle => { + // Reader exited → server closed the connection. + re_log::warn!("WsPublisher: server closed connection — reconnecting"); + break false; + } + } + }; + + if disconnected { + break; + } + } + Err(err) => { + re_log::debug!("WsPublisher: connection failed: {err} — retrying in 1s"); + } + } + + // Drain any stale commands queued during the disconnect — sending + // outdated velocity commands on reconnect would be dangerous. + while rx.try_recv().is_ok() {} + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/dimos/src/viewer.rs b/dimos/src/viewer.rs index 7af7282ef188..2aaff4075dac 100644 --- a/dimos/src/viewer.rs +++ b/dimos/src/viewer.rs @@ -3,23 +3,26 @@ use std::rc::Rc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use clap::Parser; -use dimos_viewer::interaction::{LcmPublisher, KeyboardHandler, click_event_from_ms}; -use rerun::external::{eframe, egui, re_crash_handler, re_grpc_server, re_log, re_memory, re_viewer}; +use dimos_viewer::interaction::{KeyboardHandler, WsPublisher}; +use rerun::external::{eframe, egui, re_crash_handler, re_grpc_client, re_grpc_server, re_log, re_memory, re_uri, re_viewer}; #[global_allocator] static GLOBAL: re_memory::AccountingAllocator = re_memory::AccountingAllocator::new(mimalloc::MiMalloc); -/// LCM channel for click events (follows RViz convention) -const LCM_CHANNEL: &str = "/clicked_point#geometry_msgs.PointStamped"; /// Minimum time between click events (debouncing) const CLICK_DEBOUNCE_MS: u64 = 100; + /// Maximum rapid clicks to log as warning const RAPID_CLICK_THRESHOLD: usize = 5; + /// Default gRPC listen port (9877 to avoid conflict with stock Rerun on 9876) const DEFAULT_PORT: u16 = 9877; -/// DimOS Interactive Viewer — a custom Rerun viewer with LCM click-to-navigate. +/// Default WebSocket server URL to connect to in --connect mode +const DEFAULT_WS_URL: &str = "ws://127.0.0.1:3030/ws"; + +/// DimOS Interactive Viewer — a custom Rerun viewer with click-to-navigate. /// /// Accepts the same CLI flags as the stock `rerun` binary so it can be spawned /// seamlessly via `rerun_bindings.spawn(executable_name="dimos-viewer")`. @@ -48,6 +51,27 @@ struct Args { /// Hint that data will arrive shortly (suppresses "waiting for data" message). #[arg(long)] expect_data_soon: bool, + + /// Do not start a local gRPC server; instead connect to an existing one. + /// + /// Optionally accepts a URL to a gRPC proxy server. + /// The scheme must be one of `rerun://`, `rerun+http://`, or `rerun+https://`, + /// and the pathname must be `/proxy`. + /// + /// Defaults to `rerun+http://127.0.0.1:/proxy`. + #[arg(long)] + connect: Option>, + + /// WebSocket server URL to connect to for publishing click/keyboard events. + /// + /// The viewer connects as a WebSocket CLIENT to this URL and sends JSON + /// events (click, twist, stop). The server is typically the Python + /// `RerunWebSocketServer` DimOS module. + /// + /// When provided explicitly this flag overrides the default URL. + /// When omitted, defaults to ws://127.0.0.1:3030/ws. + #[arg(long)] + ws_url: Option, } /// Wraps re_viewer::App to add keyboard control interception. @@ -73,7 +97,7 @@ impl eframe::App for DimosApp { // Process keyboard input before delegating to Rerun self.keyboard.process(ui.ctx()); - // Always draw the keyboard HUD overlay (dims when inactive) + // Draw the keyboard HUD overlay (click to engage/disengage) self.keyboard.draw_overlay(ui.ctx()); // Delegate to Rerun's main ui method @@ -110,29 +134,36 @@ async fn main() -> Result<(), Box> { re_log::setup_logging(); re_crash_handler::install_crash_handlers(re_viewer::build_info()); - // Listen for gRPC connections from Rerun's logging SDKs. - let listen_addr = format!("0.0.0.0:{}", args.port); - re_log::info!("Listening for SDK connections on {listen_addr}"); - let server_memory_limit = re_memory::MemoryLimit::parse(&args.server_memory_limit) - .expect("Bad --server-memory-limit"); - let rx_log = re_grpc_server::spawn_with_recv( - listen_addr.parse()?, - re_grpc_server::ServerOptions { - memory_limit: server_memory_limit, - ..Default::default() - }, - re_grpc_server::shutdown::never(), - ); - - // Create LCM publisher for click events - let lcm_publisher = LcmPublisher::new(LCM_CHANNEL.to_string()) - .expect("Failed to create LCM publisher"); - re_log::info!("LCM publisher created for channel: {LCM_CHANNEL}"); - - // Create keyboard handler - let keyboard_handler = KeyboardHandler::new() - .expect("Failed to create keyboard handler"); - re_log::info!("Keyboard handler initialized for WASD controls on /cmd_vel"); + // Either connect to an existing gRPC proxy, or spawn a local server. + let rx_log = if let Some(url) = args.connect.clone() { + let url = url.unwrap_or_else(|| format!("rerun+http://127.0.0.1:{}/proxy", args.port)); + let proxy_uri: re_uri::ProxyUri = url.parse() + .expect("Bad --connect URL: expected format rerun+http://host:port/proxy"); + re_log::info!("Connecting to existing gRPC proxy at {proxy_uri}"); + re_grpc_client::stream(proxy_uri) + } else { + let listen_addr = format!("0.0.0.0:{}", args.port); + re_log::info!("Listening for SDK connections on {listen_addr}"); + let server_memory_limit = re_memory::MemoryLimit::parse(&args.server_memory_limit) + .expect("Bad --server-memory-limit"); + re_grpc_server::spawn_with_recv( + listen_addr.parse()?, + re_grpc_server::ServerOptions { + memory_limit: server_memory_limit, + ..Default::default() + }, + re_grpc_server::shutdown::never(), + ) + }; + + // Resolve the WebSocket URL (always WebSocket, no LCM): + // - explicit --ws-url → use that URL + // - otherwise → default URL + let ws_url = args.ws_url.unwrap_or_else(|| DEFAULT_WS_URL.to_string()); + let ws_publisher = WsPublisher::connect(ws_url.clone()); + re_log::info!("WebSocket client connecting to {ws_url}"); + let keyboard_handler = KeyboardHandler::new(ws_publisher.clone()); + // State for debouncing and rapid click detection let last_click_time = Rc::new(RefCell::new(Instant::now())); @@ -195,27 +226,20 @@ async fn main() -> Result<(), Box> { .unwrap_or_default() .as_millis() as u64; - // Build click event and publish via LCM - let click = click_event_from_ms( - [pos.x, pos.y, pos.z], + ws_publisher.send_click( + pos.x as f64, + pos.y as f64, + pos.z as f64, &entity_path.to_string(), timestamp_ms, ); - - match lcm_publisher.publish(&click) { - Ok(_) => { - re_log::debug!( - "LCM click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", - entity_path, - pos.x, - pos.y, - pos.z - ); - } - Err(err) => { - re_log::error!("Failed to publish LCM click event: {err:?}"); - } - } + re_log::debug!( + "Click event published: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, + pos.x, + pos.y, + pos.z + ); } re_viewer::SelectionChangeItem::Entity { position: None, .. } => { no_position_count += 1; diff --git a/dimos/test_ws_client.ts b/dimos/test_ws_client.ts new file mode 100644 index 000000000000..c038f8881dfb --- /dev/null +++ b/dimos/test_ws_client.ts @@ -0,0 +1,52 @@ +/** + * Deno WebSocket test client for dimos-viewer event stream. + * + * Run with: + * deno run --allow-net dimos/test_ws_client.ts + * + * Or with a custom address: + * deno run --allow-net dimos/test_ws_client.ts ws://127.0.0.1:3030/ws + */ + +const url = Deno.args[0] ?? "ws://127.0.0.1:3030/ws"; + +console.log(`Connecting to ${url} …`); + +const ws = new WebSocket(url); + +ws.addEventListener("open", () => { + console.log(`[connected] Listening for events from dimos-viewer`); +}); + +ws.addEventListener("message", (ev) => { + const ts = new Date().toISOString(); + try { + const msg = JSON.parse(ev.data as string); + if (msg.type === "heartbeat") { + console.log(`[${ts}] heartbeat ts=${msg.timestamp_ms}`); + } else if (msg.type === "click") { + console.log( + `[${ts}] click x=${msg.x.toFixed(3)} y=${msg.y.toFixed(3)} z=${msg.z.toFixed(3)} entity="${msg.entity_path}"`, + ); + } else if (msg.type === "twist") { + console.log( + `[${ts}] twist lin=(${msg.linear_x.toFixed(2)}, ${msg.linear_y.toFixed(2)}, ${msg.linear_z.toFixed(2)}) ang=(${msg.angular_x.toFixed(2)}, ${msg.angular_y.toFixed(2)}, ${msg.angular_z.toFixed(2)})`, + ); + } else if (msg.type === "stop") { + console.log(`[${ts}] stop`); + } else { + console.log(`[${ts}] unknown `, msg); + } + } catch { + console.log(`[${ts}] raw:`, ev.data); + } +}); + +ws.addEventListener("close", (ev) => { + console.log(`[disconnected] code=${ev.code} reason=${ev.reason}`); + Deno.exit(0); +}); + +ws.addEventListener("error", (ev) => { + console.error(`[error]`, ev); +}); diff --git a/dimos/uv.lock b/dimos/uv.lock new file mode 100644 index 000000000000..f135a581b73b --- /dev/null +++ b/dimos/uv.lock @@ -0,0 +1,8 @@ +version = 1 +revision = 3 +requires-python = ">=3.10" + +[[package]] +name = "dimos-viewer" +version = "0.30.0a4" +source = { editable = "." } diff --git a/uv.lock b/uv.lock index 4acb800535c1..8091c36a1a97 100644 --- a/uv.lock +++ b/uv.lock @@ -4727,7 +4727,7 @@ provides-extras = ["all", "datafusion", "dataplatform", "notebook", "tests"] [[package]] name = "rerun-workspace" -version = "0.28.0a1+dev" +version = "0.30.0a4" source = { virtual = "." } [package.dev-dependencies]