Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
tokio = { version = "1.49.0", features = ["full"] }
wampproto = { git = "https://github.com/xconnio/wampproto-rust.git", rev = "520130fa02343409578879959748b36f151bbc8d" }
xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5" }
xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "7357b73f179d58400849c66ce2052b41fa2015ba" }

1 change: 0 additions & 1 deletion src/commands/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ async fn run_session(
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("{}", e);
colored_eprintln!(
"{}",
format_connect_error(session_id, call_config.parallel, e.as_ref())
Expand Down
29 changes: 24 additions & 5 deletions src/commands/register.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::colored_eprintln;
use crate::colored_println;
use crate::utils::{CommandOutput, wamp_async_value_to_serde};
use crate::config::ConnectionConfig;
use crate::utils::{CommandOutput, format_connect_error, wamp_async_value_to_serde};
use tokio::signal;
use xconn::async_::session::Session;
use xconn::async_::{Invocation, RegisterRequest, Yield};

async fn registration_handler(inv: Invocation) -> Yield {
Expand All @@ -23,7 +23,18 @@ async fn registration_handler(inv: Invocation) -> Yield {
Yield::new(inv.args, inv.kwargs)
}

pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box<dyn std::error::Error>> {
pub async fn handle(
conn_config: ConnectionConfig,
procedure: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
colored_eprintln!("{}", format_connect_error(1, 1, e.as_ref()));
return Ok(());
}
};

let register_request = RegisterRequest::new(procedure, registration_handler);

match session.register(register_request).await {
Expand All @@ -41,8 +52,16 @@ pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box<dyn st
}

colored_println!("Press Ctrl+C to exit");
signal::ctrl_c().await?;
colored_println!("Exiting...");
tokio::select! {
_ = signal::ctrl_c() => {
colored_println!("Exiting...");
}
_ = session.wait_disconnect() => {
colored_eprintln!("Lost connection to router");
}
}

let _ = session.leave().await;

Ok(())
}
65 changes: 55 additions & 10 deletions src/commands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::colored_println;
use crate::config::{ConnectionConfig, SubscribeConfig};
use crate::utils::{CommandOutput, format_connect_error, wamp_async_value_to_serde};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::signal;
use tokio::sync::Semaphore;
use xconn::async_::{Event, SubscribeRequest};
Expand Down Expand Up @@ -36,6 +37,8 @@ async fn run_session(
subscribe_config: Arc<SubscribeConfig>,
session_id: u32,
shutdown: tokio::sync::watch::Receiver<bool>,
disconnect_tx: tokio::sync::mpsc::Sender<()>,
ctrl_c_printed: Arc<AtomicBool>,
) {
let session = match conn_config.connect().await {
Ok(s) => s,
Expand Down Expand Up @@ -67,6 +70,11 @@ async fn run_session(
} else {
colored_println!("Subscribed to topic '{}'", subscribe_config.topic);
}

// Print "Press Ctrl+C to exit" only once across all sessions
if !ctrl_c_printed.swap(true, Ordering::Relaxed) {
colored_println!("Press Ctrl+C to exit");
}
}
Err(e) => {
colored_eprintln!("Session {} Subscribe Error: {}", session_id, e);
Expand All @@ -75,11 +83,16 @@ async fn run_session(
}
}

// Wait for shutdown signal
// Wait for either shutdown signal or connection loss
let mut shutdown = shutdown;
let _ = shutdown.changed().await;
let disconnected = tokio::select! {
_ = shutdown.changed() => false,
_ = session.wait_disconnect() => true,
};

if let Err(e) = session.leave().await {
if disconnected {
let _ = disconnect_tx.send(()).await;
} else if let Err(e) = session.leave().await {
colored_eprintln!("Session {} Error leaving: {}", session_id, e);
}
}
Expand All @@ -95,31 +108,63 @@ pub async fn handle(
// Create shutdown channel
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

// Create disconnect notification channel
let (disconnect_tx, mut disconnect_rx) = tokio::sync::mpsc::channel::<()>(1);

let ctrl_c_printed = Arc::new(AtomicBool::new(false));

let mut handles = Vec::with_capacity(subscribe_config.parallel as usize);

for session_id in 1..=subscribe_config.parallel {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let conn_config = conn_config.clone();
let subscribe_config = subscribe_config.clone();
let shutdown_rx = shutdown_rx.clone();
let disconnect_tx = disconnect_tx.clone();
let ctrl_c_printed = ctrl_c_printed.clone();

let handle = tokio::spawn(async move {
let _permit = permit;
run_session(conn_config, subscribe_config, session_id, shutdown_rx).await;
run_session(
conn_config,
subscribe_config,
session_id,
shutdown_rx,
disconnect_tx,
ctrl_c_printed,
)
.await;
});

handles.push(handle);
}

colored_println!("Press Ctrl+C to exit");
signal::ctrl_c().await?;
colored_println!("Exiting...");
// Spawn a task to track when all sessions finish
let mut join_handle = tokio::spawn(async move {
for handle in handles {
let _ = handle.await;
}
});

tokio::select! {
_ = signal::ctrl_c() => {
colored_println!("Exiting...");
}
_ = disconnect_rx.recv() => {
colored_eprintln!("Lost connection to router");
}
_ = &mut join_handle => {
// All sessions ended (e.g., all failed to connect)
// Error messages already printed in run_session
}
}

// Signal all sessions to shutdown
// Signal remaining sessions to shutdown
let _ = shutdown_tx.send(true);
drop(disconnect_tx);

for handle in handles {
let _ = handle.await;
if !join_handle.is_finished() {
let _ = join_handle.await;
}

Ok(())
Expand Down
4 changes: 1 addition & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
commands::call::handle(conn_config, call_config).await?;
}
Commands::Register { procedure } => {
let session = conn_config.connect().await?;
commands::register::handle(&session, &procedure).await?;
session.leave().await?;
commands::register::handle(conn_config, &procedure).await?;
}
Commands::Subscribe {
topic,
Expand Down
Loading