Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
tokio = { version = "1.49.0", features = ["full"] }
wampproto = { git = "https://github.com/xconnio/wampproto-rust.git", rev = "520130fa02343409578879959748b36f151bbc8d" }
xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "484b2deade287937d3c3a65b6ad9d8b2ae2dd2d1" }
xconn = { git = "https://github.com/xconnio/xconn-rust.git", rev = "2bf55ac6f0e2a7e8a3bb6212f74bf6b87a3d54d5" }

37 changes: 28 additions & 9 deletions src/commands/call.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::colored_eprintln;
use crate::config::{CallConfig, ConnectionConfig};
use crate::utils::{CommandOutput, ParsedArg, parse_arg, wamp_value_to_serde};
use crate::utils::{
CommandOutput, ParsedArg, format_connect_error, parse_arg, wamp_value_to_serde,
};
use std::sync::Arc;
use tokio::sync::Semaphore;
use xconn::sync::CallRequest;
Expand Down Expand Up @@ -64,7 +67,11 @@ async fn run_session(
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("Session {} Connection Error: {}", session_id, e);
eprintln!("{}", e);
colored_eprintln!(
"{}",
format_connect_error(session_id, call_config.parallel, e.as_ref())
);
return;
}
};
Expand All @@ -74,6 +81,11 @@ async fn run_session(

match session.call(request).await {
Ok(result) => {
if let Some(err) = result.error {
colored_eprintln!("{}", err.uri);
break;
}

let output = CommandOutput {
args: result
.args
Expand All @@ -96,21 +108,28 @@ async fn run_session(
};
match serde_json::to_string_pretty(&output) {
Ok(json) => println!("{}", json),
Err(e) => eprintln!(
Err(e) => colored_eprintln!(
"Session {} Iteration {} Error serializing result: {}",
session_id, iteration, e
session_id,
iteration,
e
),
}
}
Err(e) => eprintln!(
"Session {} Iteration {} Call Error: {}",
session_id, iteration, e
),
Err(e) => {
colored_eprintln!(
"Session {} Iteration {} Call Error: {}",
session_id,
iteration,
e
);
break;
}
}
}

if let Err(e) = session.leave().await {
eprintln!("Session {} Error leaving: {}", session_id, e);
colored_eprintln!("Session {} Error leaving: {}", session_id, e);
}
}

Expand Down
30 changes: 23 additions & 7 deletions src/commands/publish.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::colored_eprintln;
use crate::config::{ConnectionConfig, PublishConfig};
use crate::utils::{ParsedArg, parse_arg};
use crate::utils::{ParsedArg, format_connect_error, parse_arg};
use std::sync::Arc;
use tokio::sync::Semaphore;
use xconn::sync::PublishRequest;
Expand Down Expand Up @@ -69,7 +70,10 @@ async fn run_session(
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("Session {} Connection Error: {}", session_id, e);
colored_eprintln!(
"{}",
format_connect_error(session_id, publish_config.parallel, e.as_ref())
);
return;
}
};
Expand All @@ -78,11 +82,23 @@ async fn run_session(
let request = build_publish_request(&publish_config);

match session.publish(request).await {
Ok(_) => {}
Err(e) => eprintln!(
"Session {} Iteration {} Publish Error: {}",
session_id, iteration, e
),
Ok(response) => {
if let Some(resp) = response
&& let Some(err) = resp.error
{
colored_eprintln!("{}", err.uri);
break;
}
}
Err(e) => {
colored_eprintln!(
"Session {} Iteration {} Publish Error: {}",
session_id,
iteration,
e
);
break;
}
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/commands/register.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::colored_eprintln;
use crate::colored_println;
use crate::utils::{CommandOutput, wamp_async_value_to_serde};
use tokio::signal;
use xconn::async_::session::Session;
Expand Down Expand Up @@ -25,13 +27,22 @@ pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box<dyn st
let register_request = RegisterRequest::new(procedure, registration_handler);

match session.register(register_request).await {
Ok(_) => println!("Registered procedure '{}'", procedure),
Err(e) => eprintln!("Error registering procedure: {}", e),
Ok(resp) => {
if let Some(err) = resp.error {
colored_eprintln!("{}", err.uri);
return Ok(());
}
colored_println!("Registered procedure '{}'", procedure);
}
Err(e) => {
colored_eprintln!("Error registering procedure: {}", e);
return Ok(());
}
}

println!("Press Ctrl+C to exit");
colored_println!("Press Ctrl+C to exit");
signal::ctrl_c().await?;
println!("Exiting...");
colored_println!("Exiting...");

Ok(())
}
33 changes: 23 additions & 10 deletions src/commands/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::colored_eprintln;
use crate::colored_println;
use crate::config::{ConnectionConfig, SubscribeConfig};
use crate::utils::{CommandOutput, wamp_async_value_to_serde};
use crate::utils::{CommandOutput, format_connect_error, wamp_async_value_to_serde};
use std::sync::Arc;
use tokio::signal;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -38,26 +40,37 @@ async fn run_session(
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("Session {} Connection Error: {}", session_id, e);
colored_eprintln!(
"{}",
format_connect_error(session_id, subscribe_config.parallel, e.as_ref())
);
return;
}
};

let request = build_subscribe_request(&subscribe_config);

match session.subscribe(request).await {
Ok(_) => {
Ok(resp) => {
if let Some(err) = resp.error {
colored_eprintln!("{}", err.uri);
let _ = session.leave().await;
return;
}

if subscribe_config.parallel > 1 {
println!(
colored_println!(
"Session {}: Subscribed to topic '{}'",
session_id, subscribe_config.topic
session_id,
subscribe_config.topic
);
} else {
println!("Subscribed to topic '{}'", subscribe_config.topic);
colored_println!("Subscribed to topic '{}'", subscribe_config.topic);
}
}
Err(e) => {
eprintln!("Session {} Subscribe Error: {}", session_id, e);
colored_eprintln!("Session {} Subscribe Error: {}", session_id, e);
let _ = session.leave().await;
return;
}
}
Expand All @@ -67,7 +80,7 @@ async fn run_session(
let _ = shutdown.changed().await;

if let Err(e) = session.leave().await {
eprintln!("Session {} Error leaving: {}", session_id, e);
colored_eprintln!("Session {} Error leaving: {}", session_id, e);
}
}

Expand Down Expand Up @@ -98,9 +111,9 @@ pub async fn handle(
handles.push(handle);
}

println!("Press Ctrl+C to exit");
colored_println!("Press Ctrl+C to exit");
signal::ctrl_c().await?;
println!("Exiting...");
colored_println!("Exiting...");

// Signal all sessions to shutdown
let _ = shutdown_tx.send(true);
Expand Down
46 changes: 46 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,52 @@ use serde::Serialize;
use serde_json::Value as SerdeValue;
use xconn::sync::Value as WampValue;

#[macro_export]
macro_rules! colored_eprintln {
($($arg:tt)*) => {{
const RED: &str = "\x1b[31m";
const RESET: &str = "\x1b[0m";
eprintln!("{}[ERROR]{} {}", RED, RESET, format!($($arg)*));
}};
}

#[macro_export]
macro_rules! colored_println {
($($arg:tt)*) => {{
const BLUE: &str = "\x1b[34m";
const RESET: &str = "\x1b[0m";
println!("{}[INFO]{} {}", BLUE, RESET, format!($($arg)*));
}};
}

pub fn format_connect_error(
session_id: u32,
parallel: u32,
error: &dyn std::error::Error,
) -> String {
fn capitalize_first(mut message: String) -> String {
let Some(first) = message.chars().next() else {
return message;
};
if first.is_ascii_lowercase() {
message.replace_range(0..first.len_utf8(), &first.to_ascii_uppercase().to_string());
}
message
}

let mut root = error;
while let Some(src) = root.source() {
root = src;
}

let message = capitalize_first(root.to_string());
if parallel == 1 {
message
} else {
format!("Session {}: {}", session_id, message)
}
}

#[derive(Debug)]
pub enum ParsedArg {
Integer(i64),
Expand Down
Loading