From 3b6c2acca18cf0ddad0db1dbc187e5679af533da Mon Sep 17 00:00:00 2001 From: Cyril Fougeray Date: Thu, 18 Dec 2025 18:15:40 +0100 Subject: [PATCH] feat(zenoh): experimental tooling add sub & query commands, with remote conn possible by passing `--connect IP` rename orb-zenoh-rpc to orb-zenoh --- Cargo.lock | 2 +- experiments/zenoh/Cargo.toml | 2 +- experiments/zenoh/src/main.rs | 158 ++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87ae1a99c..0ccebdc04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8845,7 +8845,7 @@ dependencies = [ ] [[package]] -name = "orb-zenoh-rpc" +name = "orb-zenoh" version = "0.0.0" dependencies = [ "async-trait", diff --git a/experiments/zenoh/Cargo.toml b/experiments/zenoh/Cargo.toml index 6f709ea87..b73117199 100644 --- a/experiments/zenoh/Cargo.toml +++ b/experiments/zenoh/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "orb-zenoh-rpc" +name = "orb-zenoh" version = "0.0.0" description = "Experiments with the zenoh messaging system" publish = false diff --git a/experiments/zenoh/src/main.rs b/experiments/zenoh/src/main.rs index 43cab4a26..1b73a9567 100644 --- a/experiments/zenoh/src/main.rs +++ b/experiments/zenoh/src/main.rs @@ -13,6 +13,26 @@ enum Args { #[clap(long)] use_contiguous: bool, }, + Sub { + #[clap(long)] + key: String, + #[clap(long)] + connect: Option, + }, + Query { + #[clap(long)] + key: String, + #[clap(long)] + connect: Option, + }, + Pub { + #[clap(long)] + key: String, + #[clap(long)] + value: String, + #[clap(long)] + connect: Option, + }, } #[tokio::main] @@ -25,11 +45,149 @@ async fn main() -> color_eyre::Result<()> { let result = match args { Args::Alice { .. } => alice(args).await, Args::Bob { .. } => bob(args).await, + Args::Sub { .. } => sub(args).await, + Args::Query { .. } => query(args).await, + Args::Pub { .. } => publish(args).await, }; telemetry.flush().await; result } +async fn sub(args: Args) -> color_eyre::Result<()> { + let Args::Sub { + key: zenoh_key, + connect, + } = args + else { + unreachable!() + }; + + let connect_config = connect + .map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr)) + .unwrap_or_default(); + + let cfg = zenoh::Config::from_json5(&format!( + r#"{{ + mode: "peer", + {connect_config} + }}"# + )) + .unwrap_or_else(|e| panic!("failed to parse config: {}", e)); + + let session = zenoh::open(cfg) + .await + .unwrap_or_else(|e| panic!("failed to open session: {}", e)); + let sub = session + .declare_subscriber(&zenoh_key) + .await + .unwrap_or_else(|e| panic!("failed to declare subscriber: {}", e)); + + tracing::info!("Subscribed to {zenoh_key}"); + while let Ok(sample) = sub.recv_async().await { + if let Ok(payload_str) = sample.payload().try_to_string() { + tracing::info!("recv key={} payload={:?}", sample.key_expr(), payload_str); + } else { + tracing::info!( + "recv key={} payload={:?}", + sample.key_expr(), + sample.payload() + ); + } + } + + Ok(()) +} + +async fn query(args: Args) -> color_eyre::Result<()> { + let Args::Query { + key: zenoh_key, + connect, + } = args + else { + unreachable!() + }; + + let connect_config = connect + .map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr)) + .unwrap_or_default(); + + let cfg = zenoh::Config::from_json5(&format!( + r#"{{ + mode: "peer", + {connect_config} + }}"# + )) + .unwrap_or_else(|e| panic!("failed to parse config: {}", e)); + + let session = zenoh::open(cfg) + .await + .unwrap_or_else(|e| panic!("failed to open session: {}", e)); + + tracing::info!("Querying key: {zenoh_key}"); + let replies = session + .get(&zenoh_key) + .await + .unwrap_or_else(|e| panic!("failed to query: {}", e)); + + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => { + if let Ok(payload_str) = sample.payload().try_to_string() { + println!("key={} payload={}", sample.key_expr(), payload_str); + } else { + println!( + "key={} payload={:?}", + sample.key_expr(), + sample.payload() + ); + } + } + Err(err) => { + tracing::warn!("Query error for key {}: {:?}", zenoh_key, err); + } + } + } + + Ok(()) +} + +async fn publish(args: Args) -> color_eyre::Result<()> { + let Args::Pub { + key: zenoh_key, + value, + connect, + } = args + else { + unreachable!() + }; + + let connect_config = connect + .map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr)) + .unwrap_or_default(); + + let cfg = zenoh::Config::from_json5(&format!( + r#"{{ + mode: "peer", + {connect_config} + }}"# + )) + .unwrap_or_else(|e| panic!("failed to parse config: {}", e)); + + let session = zenoh::open(cfg) + .await + .unwrap_or_else(|e| panic!("failed to open session: {}", e)); + + tracing::info!("Publishing to key: {zenoh_key}"); + session + .put(&zenoh_key, value.as_bytes()) + .await + .unwrap_or_else(|e| panic!("failed to put: {}", e)); + + tracing::info!("Published value: {value}"); + + Ok(()) +} + async fn alice(args: Args) -> Result<()> { let Args::Alice { payload_size } = args else { unreachable!()