From 9d3d787dbb61323e857aaf130678a3176e136841 Mon Sep 17 00:00:00 2001 From: Ismail Akram Date: Tue, 13 Jan 2026 20:21:58 +0500 Subject: [PATCH] Add error handling --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/commands/call.rs | 37 +++++++++++++++++++++++-------- src/commands/publish.rs | 30 +++++++++++++++++++------ src/commands/register.rs | 19 ++++++++++++---- src/commands/subscribe.rs | 33 +++++++++++++++++++--------- src/utils.rs | 46 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 137 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26bad73..f61baab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1477,7 +1477,7 @@ checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" [[package]] name = "xconn" version = "0.1.0" -source = "git+https://github.com/xconnio/xconn-rust.git?rev=484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1#484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1" +source = "git+https://github.com/xconnio/xconn-rust.git?rev=2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5#2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5" dependencies = [ "async-trait", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 5705317..271f2d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,5 @@ serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" tokio = { version = "1.49.0", features = ["full"] } wampproto = { git = "https://github.com/xconnio/wampproto-rust.git", rev = "520130fa02343409578879959748b36f151bbc8d" } -xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1" } +xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5" } diff --git a/src/commands/call.rs b/src/commands/call.rs index bda617a..47307e5 100644 --- a/src/commands/call.rs +++ b/src/commands/call.rs @@ -1,5 +1,8 @@ +use crate::colored_eprintln; use crate::config::{CallConfig, ConnectionConfig}; -use crate::utils::{CommandOutput, ParsedArg, parse_arg, wamp_value_to_serde}; +use crate::utils::{ + CommandOutput, ParsedArg, format_connect_error, parse_arg, wamp_value_to_serde, +}; use std::sync::Arc; use tokio::sync::Semaphore; use xconn::sync::CallRequest; @@ -64,7 +67,11 @@ async fn run_session( let session = match conn_config.connect().await { Ok(s) => s, Err(e) => { - eprintln!("Session {} Connection Error: {}", session_id, e); + eprintln!("{}", e); + colored_eprintln!( + "{}", + format_connect_error(session_id, call_config.parallel, e.as_ref()) + ); return; } }; @@ -74,6 +81,11 @@ async fn run_session( match session.call(request).await { Ok(result) => { + if let Some(err) = result.error { + colored_eprintln!("{}", err.uri); + break; + } + let output = CommandOutput { args: result .args @@ -96,21 +108,28 @@ async fn run_session( }; match serde_json::to_string_pretty(&output) { Ok(json) => println!("{}", json), - Err(e) => eprintln!( + Err(e) => colored_eprintln!( "Session {} Iteration {} Error serializing result: {}", - session_id, iteration, e + session_id, + iteration, + e ), } } - Err(e) => eprintln!( - "Session {} Iteration {} Call Error: {}", - session_id, iteration, e - ), + Err(e) => { + colored_eprintln!( + "Session {} Iteration {} Call Error: {}", + session_id, + iteration, + e + ); + break; + } } } if let Err(e) = session.leave().await { - eprintln!("Session {} Error leaving: {}", session_id, e); + colored_eprintln!("Session {} Error leaving: {}", session_id, e); } } diff --git a/src/commands/publish.rs b/src/commands/publish.rs index d4cfefb..cb9b9e7 100644 --- a/src/commands/publish.rs +++ b/src/commands/publish.rs @@ -1,5 +1,6 @@ +use crate::colored_eprintln; use crate::config::{ConnectionConfig, PublishConfig}; -use crate::utils::{ParsedArg, parse_arg}; +use crate::utils::{ParsedArg, format_connect_error, parse_arg}; use std::sync::Arc; use tokio::sync::Semaphore; use xconn::sync::PublishRequest; @@ -69,7 +70,10 @@ async fn run_session( let session = match conn_config.connect().await { Ok(s) => s, Err(e) => { - eprintln!("Session {} Connection Error: {}", session_id, e); + colored_eprintln!( + "{}", + format_connect_error(session_id, publish_config.parallel, e.as_ref()) + ); return; } }; @@ -78,11 +82,23 @@ async fn run_session( let request = build_publish_request(&publish_config); match session.publish(request).await { - Ok(_) => {} - Err(e) => eprintln!( - "Session {} Iteration {} Publish Error: {}", - session_id, iteration, e - ), + Ok(response) => { + if let Some(resp) = response + && let Some(err) = resp.error + { + colored_eprintln!("{}", err.uri); + break; + } + } + Err(e) => { + colored_eprintln!( + "Session {} Iteration {} Publish Error: {}", + session_id, + iteration, + e + ); + break; + } } } diff --git a/src/commands/register.rs b/src/commands/register.rs index 640641e..8ad09d2 100644 --- a/src/commands/register.rs +++ b/src/commands/register.rs @@ -1,3 +1,5 @@ +use crate::colored_eprintln; +use crate::colored_println; use crate::utils::{CommandOutput, wamp_async_value_to_serde}; use tokio::signal; use xconn::async_::session::Session; @@ -25,13 +27,22 @@ pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box println!("Registered procedure '{}'", procedure), - Err(e) => eprintln!("Error registering procedure: {}", e), + Ok(resp) => { + if let Some(err) = resp.error { + colored_eprintln!("{}", err.uri); + return Ok(()); + } + colored_println!("Registered procedure '{}'", procedure); + } + Err(e) => { + colored_eprintln!("Error registering procedure: {}", e); + return Ok(()); + } } - println!("Press Ctrl+C to exit"); + colored_println!("Press Ctrl+C to exit"); signal::ctrl_c().await?; - println!("Exiting..."); + colored_println!("Exiting..."); Ok(()) } diff --git a/src/commands/subscribe.rs b/src/commands/subscribe.rs index 69799a1..1dddc26 100644 --- a/src/commands/subscribe.rs +++ b/src/commands/subscribe.rs @@ -1,5 +1,7 @@ +use crate::colored_eprintln; +use crate::colored_println; use crate::config::{ConnectionConfig, SubscribeConfig}; -use crate::utils::{CommandOutput, wamp_async_value_to_serde}; +use crate::utils::{CommandOutput, format_connect_error, wamp_async_value_to_serde}; use std::sync::Arc; use tokio::signal; use tokio::sync::Semaphore; @@ -38,7 +40,10 @@ async fn run_session( let session = match conn_config.connect().await { Ok(s) => s, Err(e) => { - eprintln!("Session {} Connection Error: {}", session_id, e); + colored_eprintln!( + "{}", + format_connect_error(session_id, subscribe_config.parallel, e.as_ref()) + ); return; } }; @@ -46,18 +51,26 @@ async fn run_session( let request = build_subscribe_request(&subscribe_config); match session.subscribe(request).await { - Ok(_) => { + Ok(resp) => { + if let Some(err) = resp.error { + colored_eprintln!("{}", err.uri); + let _ = session.leave().await; + return; + } + if subscribe_config.parallel > 1 { - println!( + colored_println!( "Session {}: Subscribed to topic '{}'", - session_id, subscribe_config.topic + session_id, + subscribe_config.topic ); } else { - println!("Subscribed to topic '{}'", subscribe_config.topic); + colored_println!("Subscribed to topic '{}'", subscribe_config.topic); } } Err(e) => { - eprintln!("Session {} Subscribe Error: {}", session_id, e); + colored_eprintln!("Session {} Subscribe Error: {}", session_id, e); + let _ = session.leave().await; return; } } @@ -67,7 +80,7 @@ async fn run_session( let _ = shutdown.changed().await; if let Err(e) = session.leave().await { - eprintln!("Session {} Error leaving: {}", session_id, e); + colored_eprintln!("Session {} Error leaving: {}", session_id, e); } } @@ -98,9 +111,9 @@ pub async fn handle( handles.push(handle); } - println!("Press Ctrl+C to exit"); + colored_println!("Press Ctrl+C to exit"); signal::ctrl_c().await?; - println!("Exiting..."); + colored_println!("Exiting..."); // Signal all sessions to shutdown let _ = shutdown_tx.send(true); diff --git a/src/utils.rs b/src/utils.rs index db3e320..f28718c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -2,6 +2,52 @@ use serde::Serialize; use serde_json::Value as SerdeValue; use xconn::sync::Value as WampValue; +#[macro_export] +macro_rules! colored_eprintln { + ($($arg:tt)*) => {{ + const RED: &str = "\x1b[31m"; + const RESET: &str = "\x1b[0m"; + eprintln!("{}[ERROR]{} {}", RED, RESET, format!($($arg)*)); + }}; +} + +#[macro_export] +macro_rules! colored_println { + ($($arg:tt)*) => {{ + const BLUE: &str = "\x1b[34m"; + const RESET: &str = "\x1b[0m"; + println!("{}[INFO]{} {}", BLUE, RESET, format!($($arg)*)); + }}; +} + +pub fn format_connect_error( + session_id: u32, + parallel: u32, + error: &dyn std::error::Error, +) -> String { + fn capitalize_first(mut message: String) -> String { + let Some(first) = message.chars().next() else { + return message; + }; + if first.is_ascii_lowercase() { + message.replace_range(0..first.len_utf8(), &first.to_ascii_uppercase().to_string()); + } + message + } + + let mut root = error; + while let Some(src) = root.source() { + root = src; + } + + let message = capitalize_first(root.to_string()); + if parallel == 1 { + message + } else { + format!("Session {}: {}", session_id, message) + } +} + #[derive(Debug)] pub enum ParsedArg { Integer(i64),