diff --git a/Cargo.lock b/Cargo.lock index 93cb1bf..64eb313 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1477,7 +1477,7 @@ checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" [[package]] name = "xconn" version = "0.1.0" -source = "git+https://github.com/xconnio/xconn-rust.git?rev=2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5#2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5" +source = "git+https://github.com/xconnio/xconn-rust.git?rev=7357b73f179d58400849c66ce2052b41fa2015ba#7357b73f179d58400849c66ce2052b41fa2015ba" dependencies = [ "async-trait", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index bddbd71..064a039 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,5 @@ serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" tokio = { version = "1.49.0", features = ["full"] } wampproto = { git = "https://github.com/xconnio/wampproto-rust.git", rev = "520130fa02343409578879959748b36f151bbc8d" } -xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5" } +xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "7357b73f179d58400849c66ce2052b41fa2015ba" } diff --git a/src/commands/call.rs b/src/commands/call.rs index 47307e5..c5c88d7 100644 --- a/src/commands/call.rs +++ b/src/commands/call.rs @@ -67,7 +67,6 @@ async fn run_session( let session = match conn_config.connect().await { Ok(s) => s, Err(e) => { - eprintln!("{}", e); colored_eprintln!( "{}", format_connect_error(session_id, call_config.parallel, e.as_ref()) diff --git a/src/commands/register.rs b/src/commands/register.rs index 8ad09d2..62dbeee 100644 --- a/src/commands/register.rs +++ b/src/commands/register.rs @@ -1,8 +1,8 @@ use crate::colored_eprintln; use crate::colored_println; -use crate::utils::{CommandOutput, wamp_async_value_to_serde}; +use crate::config::ConnectionConfig; +use crate::utils::{CommandOutput, format_connect_error, wamp_async_value_to_serde}; use tokio::signal; -use xconn::async_::session::Session; use xconn::async_::{Invocation, RegisterRequest, Yield}; async fn registration_handler(inv: Invocation) -> Yield { @@ -23,7 +23,18 @@ async fn registration_handler(inv: Invocation) -> Yield { Yield::new(inv.args, inv.kwargs) } -pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box> { +pub async fn handle( + conn_config: ConnectionConfig, + procedure: &str, +) -> Result<(), Box> { + let session = match conn_config.connect().await { + Ok(s) => s, + Err(e) => { + colored_eprintln!("{}", format_connect_error(1, 1, e.as_ref())); + return Ok(()); + } + }; + let register_request = RegisterRequest::new(procedure, registration_handler); match session.register(register_request).await { @@ -41,8 +52,16 @@ pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box { + colored_println!("Exiting..."); + } + _ = session.wait_disconnect() => { + colored_eprintln!("Lost connection to router"); + } + } + + let _ = session.leave().await; Ok(()) } diff --git a/src/commands/subscribe.rs b/src/commands/subscribe.rs index 1dddc26..c736cd3 100644 --- a/src/commands/subscribe.rs +++ b/src/commands/subscribe.rs @@ -3,6 +3,7 @@ use crate::colored_println; use crate::config::{ConnectionConfig, SubscribeConfig}; use crate::utils::{CommandOutput, format_connect_error, wamp_async_value_to_serde}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::signal; use tokio::sync::Semaphore; use xconn::async_::{Event, SubscribeRequest}; @@ -36,6 +37,8 @@ async fn run_session( subscribe_config: Arc, session_id: u32, shutdown: tokio::sync::watch::Receiver, + disconnect_tx: tokio::sync::mpsc::Sender<()>, + ctrl_c_printed: Arc, ) { let session = match conn_config.connect().await { Ok(s) => s, @@ -67,6 +70,11 @@ async fn run_session( } else { colored_println!("Subscribed to topic '{}'", subscribe_config.topic); } + + // Print "Press Ctrl+C to exit" only once across all sessions + if !ctrl_c_printed.swap(true, Ordering::Relaxed) { + colored_println!("Press Ctrl+C to exit"); + } } Err(e) => { colored_eprintln!("Session {} Subscribe Error: {}", session_id, e); @@ -75,11 +83,16 @@ async fn run_session( } } - // Wait for shutdown signal + // Wait for either shutdown signal or connection loss let mut shutdown = shutdown; - let _ = shutdown.changed().await; + let disconnected = tokio::select! { + _ = shutdown.changed() => false, + _ = session.wait_disconnect() => true, + }; - if let Err(e) = session.leave().await { + if disconnected { + let _ = disconnect_tx.send(()).await; + } else if let Err(e) = session.leave().await { colored_eprintln!("Session {} Error leaving: {}", session_id, e); } } @@ -95,6 +108,11 @@ pub async fn handle( // Create shutdown channel let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + // Create disconnect notification channel + let (disconnect_tx, mut disconnect_rx) = tokio::sync::mpsc::channel::<()>(1); + + let ctrl_c_printed = Arc::new(AtomicBool::new(false)); + let mut handles = Vec::with_capacity(subscribe_config.parallel as usize); for session_id in 1..=subscribe_config.parallel { @@ -102,24 +120,51 @@ pub async fn handle( let conn_config = conn_config.clone(); let subscribe_config = subscribe_config.clone(); let shutdown_rx = shutdown_rx.clone(); + let disconnect_tx = disconnect_tx.clone(); + let ctrl_c_printed = ctrl_c_printed.clone(); let handle = tokio::spawn(async move { let _permit = permit; - run_session(conn_config, subscribe_config, session_id, shutdown_rx).await; + run_session( + conn_config, + subscribe_config, + session_id, + shutdown_rx, + disconnect_tx, + ctrl_c_printed, + ) + .await; }); handles.push(handle); } - colored_println!("Press Ctrl+C to exit"); - signal::ctrl_c().await?; - colored_println!("Exiting..."); + // Spawn a task to track when all sessions finish + let mut join_handle = tokio::spawn(async move { + for handle in handles { + let _ = handle.await; + } + }); + + tokio::select! { + _ = signal::ctrl_c() => { + colored_println!("Exiting..."); + } + _ = disconnect_rx.recv() => { + colored_eprintln!("Lost connection to router"); + } + _ = &mut join_handle => { + // All sessions ended (e.g., all failed to connect) + // Error messages already printed in run_session + } + } - // Signal all sessions to shutdown + // Signal remaining sessions to shutdown let _ = shutdown_tx.send(true); + drop(disconnect_tx); - for handle in handles { - let _ = handle.await; + if !join_handle.is_finished() { + let _ = join_handle.await; } Ok(()) diff --git a/src/main.rs b/src/main.rs index 095ecd7..672165b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,9 +39,7 @@ async fn main() -> Result<(), Box> { commands::call::handle(conn_config, call_config).await?; } Commands::Register { procedure } => { - let session = conn_config.connect().await?; - commands::register::handle(&session, &procedure).await?; - session.leave().await?; + commands::register::handle(conn_config, &procedure).await?; } Commands::Subscribe { topic,