diff --git a/clients/rust/prism-client/Cargo.lock b/clients/rust/prism-client/Cargo.lock index 699de417f..2ea606f92 100644 --- a/clients/rust/prism-client/Cargo.lock +++ b/clients/rust/prism-client/Cargo.lock @@ -166,6 +166,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.3.2" @@ -1040,6 +1046,7 @@ dependencies = [ name = "prism-client" version = "0.1.0" dependencies = [ + "base64 0.22.1", "chrono", "env_logger", "futures", @@ -1047,6 +1054,7 @@ dependencies = [ "rand", "reqwest", "serde", + "serde_json", "serde_yaml", "tempfile", "thiserror", @@ -1200,7 +1208,7 @@ version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -1253,7 +1261,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -1596,7 +1604,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.21.7", "bytes", "h2", "http", diff --git a/clients/rust/prism-client/Cargo.toml b/clients/rust/prism-client/Cargo.toml index 28e866c63..d69777477 100644 --- a/clients/rust/prism-client/Cargo.toml +++ b/clients/rust/prism-client/Cargo.toml @@ -41,6 +41,8 @@ tempfile = "3" rand = "0.8" chrono = "0.4" env_logger = "0.11" +serde_json = "1.0" +base64 = "0.22" [build-dependencies] tonic-build = "0.11" diff --git a/clients/rust/prism-client/examples/canary.rs b/clients/rust/prism-client/examples/canary.rs index ac83f511d..c0315dacf 100644 --- a/clients/rust/prism-client/examples/canary.rs +++ b/clients/rust/prism-client/examples/canary.rs @@ -1,14 +1,16 @@ //! Prism Client Canary Test //! //! Continuously tests the Prism client SDK against a live proxy. -//! Uses a TTL-based key to determine runtime and crashes if tests fail. +//! Uses key/value as source of truth for pub/sub topic configuration. //! //! ## Behavior: //! 1. Creates a TTL key ("canary:ttl") with random expiration (30-60s) -//! 2. Publishes messages once per second to "canary/heartbeat" -//! 3. Verifies each published message is received -//! 4. Exits gracefully when TTL key expires -//! 5. CRASHES if test runs 2x TTL time without key expiring (indicates problem) +//! 2. Stores pub/sub topic name in key/value ("canary:config:topic") +//! 3. Publishes messages with random data to the topic from KV +//! 4. Subscribes and validates received messages match published data +//! 5. Exits gracefully when TTL key expires +//! 6. CRASHES if test runs 2x TTL time without key expiring (indicates problem) +//! 7. CRASHES if message validation fails (payload mismatch) //! //! ## Environment Variables: //! - `PRISM_ENDPOINT` - Proxy endpoint (default: localhost:8980) @@ -39,10 +41,13 @@ impl CanaryConfig { } } +/// Statistics tracker for canary test execution struct CanaryStats { start_time: Instant, messages_published: u64, messages_received: u64, + messages_validated: u64, + validation_failures: u64, errors: u64, last_check_time: Instant, } @@ -54,6 +59,8 @@ impl CanaryStats { start_time: now, messages_published: 0, messages_received: 0, + messages_validated: 0, + validation_failures: 0, errors: 0, last_check_time: now, } @@ -68,15 +75,45 @@ impl CanaryStats { println!(" ā±ļø Uptime: {:.1}s", self.uptime().as_secs_f64()); println!(" šŸ“¤ Messages Published: {}", self.messages_published); println!(" šŸ“„ Messages Received: {}", self.messages_received); - println!(" āŒ Errors: {}", self.errors); + println!(" āœ… Messages Validated: {}", self.messages_validated); + println!(" āŒ Validation Failures: {}", self.validation_failures); + println!(" āš ļø Errors: {}", self.errors); if self.messages_published > 0 { let success_rate = - (self.messages_received as f64 / self.messages_published as f64) * 100.0; - println!(" āœ… Success Rate: {:.1}%", success_rate); + (self.messages_validated as f64 / self.messages_published as f64) * 100.0; + println!(" šŸ“ˆ Success Rate: {:.1}%", success_rate); } } } +/// Represents a message we published and expect to receive +#[derive(Debug, Clone)] +struct PublishedMessage { + id: String, + payload: Vec, + timestamp: String, + published_at: Instant, +} + +/// Generate random data for message payload +fn generate_random_data(rng: &mut rand::rngs::ThreadRng, size: usize) -> Vec { + use rand::Rng; + (0..size).map(|_| rng.gen::()).collect() +} + +/// Create a JSON payload with message metadata +fn create_message_payload(msg: &PublishedMessage, random_data: &[u8]) -> Vec { + use std::collections::HashMap; + let mut data = HashMap::new(); + data.insert("id", msg.id.clone()); + data.insert("timestamp", msg.timestamp.clone()); + data.insert( + "data", + base64::Engine::encode(&base64::engine::general_purpose::STANDARD, random_data), + ); + serde_json::to_vec(&data).unwrap_or_default() +} + fn main() -> Result<(), Box> { // Setup logging env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); @@ -117,9 +154,25 @@ fn main() -> Result<(), Box> { kv_client.set(ttl_key, ttl_seconds.to_string().as_bytes())?; println!("āœ… TTL key created"); + // Store pub/sub topic configuration in KV (source of truth) + let config_topic_key = "config:topic"; + let heartbeat_topic = "heartbeat"; + kv_client.set(config_topic_key, heartbeat_topic.as_bytes())?; + println!("\nšŸ“ Stored topic configuration in KV:"); + println!(" šŸ”‘ Key: {}", config_topic_key); + println!(" šŸ“¢ Topic: {}", heartbeat_topic); + + // Verify we can read it back + let stored_topic = kv_client + .get(config_topic_key)? + .ok_or("Failed to read topic from KV")?; + let topic_name = String::from_utf8(stored_topic)?; + println!(" āœ… Verified topic from KV: {}", topic_name); + let mut stats = CanaryStats::new(); let start_time = Instant::now(); - let heartbeat_topic = "heartbeat"; + let mut pending_messages: std::collections::HashMap = + std::collections::HashMap::new(); println!( "\nšŸ” Starting canary loop (publishing every {}s)...\n", @@ -170,23 +223,56 @@ fn main() -> Result<(), Box> { } } - // Create unique message ID + // Read topic name from KV (demonstrating KV as source of truth) + let current_topic = match kv_client.get(config_topic_key) { + Ok(Some(topic_bytes)) => { + String::from_utf8(topic_bytes).unwrap_or_else(|_| topic_name.clone()) + } + Ok(None) => { + eprintln!("āš ļø Topic configuration missing from KV, using default"); + topic_name.clone() + } + Err(e) => { + eprintln!("āš ļø Error reading topic from KV: {}, using cached value", e); + stats.errors += 1; + topic_name.clone() + } + }; + + // Generate random message data let message_id = format!("canary-{}-{}", elapsed.as_secs(), stats.messages_published); let timestamp = chrono::Utc::now().to_rfc3339(); + let data_size = rng.gen_range(64..=256); + let random_data = generate_random_data(&mut rng, data_size); + + let published_msg = PublishedMessage { + id: message_id.clone(), + payload: random_data.clone(), + timestamp: timestamp.clone(), + published_at: Instant::now(), + }; + + let payload = create_message_payload(&published_msg, &random_data); // Publish message match producer.publish_with_metadata( - heartbeat_topic, - message_id.as_bytes(), + ¤t_topic, + &payload, vec![ ("message_id", &message_id), ("timestamp", ×tamp), ("uptime_secs", &elapsed.as_secs().to_string()), + ("data_size", &random_data.len().to_string()), ], ) { Ok(_) => { stats.messages_published += 1; - print!("šŸ“¤ Published: {} ", message_id); + pending_messages.insert(message_id.clone(), published_msg); + print!( + "šŸ“¤ Published: {} ({} bytes) ", + message_id, + random_data.len() + ); } Err(e) => { stats.errors += 1; @@ -194,16 +280,72 @@ fn main() -> Result<(), Box> { } } - // Try to consume the message + // Try to consume and validate the message match consumer.receive() { Ok(message) => { stats.messages_received += 1; - let received_id = String::from_utf8_lossy(&message.payload); - println!("šŸ“„ Received: {}", received_id); - // Acknowledge - if let Err(e) = message.ack() { - eprintln!("āš ļø Failed to ack message: {}", e); + // Try to parse the message + if let Ok(parsed) = serde_json::from_slice::< + std::collections::HashMap, + >(&message.payload) + { + if let Some(received_id) = parsed.get("id") { + print!("šŸ“„ Received: {} ", received_id); + + // Validate against pending messages + if let Some(expected_msg) = pending_messages.remove(received_id) { + // Validate payload + if let (Some(received_data_b64), Some(expected_ts)) = + (parsed.get("data"), parsed.get("timestamp")) + { + if let Ok(received_data) = base64::Engine::decode( + &base64::engine::general_purpose::STANDARD, + received_data_b64, + ) { + if received_data == expected_msg.payload + && expected_ts == &expected_msg.timestamp + { + stats.messages_validated += 1; + let latency = expected_msg.published_at.elapsed(); + println!( + "āœ… Validated (latency: {:.1}ms)", + latency.as_millis() + ); + } else { + stats.validation_failures += 1; + println!( + "āŒ Validation FAILED: payload or timestamp mismatch" + ); + if stats.validation_failures > 3 { + stats.print_summary(); + eprintln!("\nšŸ’„ CRASH: Too many validation failures!"); + std::process::exit(1); + } + } + } else { + println!("āš ļø Failed to decode base64 data"); + stats.errors += 1; + } + } else { + println!("āš ļø Missing data or timestamp in message"); + stats.errors += 1; + } + } else { + println!("āš ļø Received unexpected message ID"); + } + + // Acknowledge + if let Err(e) = message.ack() { + eprintln!("āš ļø Failed to ack message: {}", e); + stats.errors += 1; + } + } else { + println!("āš ļø Message missing ID field"); + stats.errors += 1; + } + } else { + println!("āš ļø Failed to parse message JSON"); stats.errors += 1; } } @@ -213,14 +355,19 @@ fn main() -> Result<(), Box> { } } + // Clean up old pending messages (> 10s old) + let now = Instant::now(); + pending_messages + .retain(|_, msg| now.duration_since(msg.published_at) < Duration::from_secs(10)); + // Print periodic status - #[allow(clippy::manual_is_multiple_of)] - if elapsed.as_secs() % 10 == 0 && stats.messages_published > 0 { + if elapsed.as_secs().is_multiple_of(10) && stats.messages_published > 0 { let time_remaining = ttl_duration.saturating_sub(elapsed); println!( - "\nšŸ“Š Status: {} published, {} received | TTL remaining: ~{}s\n", + "\nšŸ“Š Status: {} published, {} validated, {} pending | TTL remaining: ~{}s\n", stats.messages_published, - stats.messages_received, + stats.messages_validated, + pending_messages.len(), time_remaining.as_secs() ); } diff --git a/clients/rust/prism-client/src/patterns/consumer.rs b/clients/rust/prism-client/src/patterns/consumer.rs index 509b80980..4dfe494ed 100644 --- a/clients/rust/prism-client/src/patterns/consumer.rs +++ b/clients/rust/prism-client/src/patterns/consumer.rs @@ -64,8 +64,26 @@ impl Consumer { } /// Receive a single message (blocking) + /// + /// This method blocks the current thread until a message is received. + /// For non-blocking operations, use the async API instead. + /// + /// # Errors + /// + /// Returns an error if the connection fails or the stream ends without a message. + /// + /// # Example + /// + /// ```no_run + /// # use prism_client::Client; + /// let client = Client::connect("localhost:8980")?; + /// let consumer = client.consumer("orders")?; + /// let message = consumer.receive()?; + /// println!("Received: {:?}", message.payload); + /// message.ack()?; + /// # Ok::<(), Box>(()) + /// ``` pub fn receive(&self) -> Result { - // TODO: Block on async receive self.runtime.block_on(self.async_consumer.receive()) } } @@ -185,7 +203,30 @@ impl AsyncConsumer { })) } - /// Receive a single message + /// Receive a single message from the default namespace topic + /// + /// This method subscribes to the namespace (using it as the topic) and + /// returns the first message received. For production use, prefer using + /// `subscribe()` with explicit topics for better control. + /// + /// # Errors + /// + /// Returns `TransientError::Status` if the gRPC connection fails. + /// Returns `PermanentError::InvalidData` if the stream ends without a message. + /// + /// # Example + /// + /// ```no_run + /// # use prism_client::r#async::AsyncClient; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box> { + /// let client = AsyncClient::connect_simple("localhost:8980").await?; + /// let consumer = client.consumer("orders").await?; + /// let message = consumer.receive().await?; + /// println!("Received: {:?}", message.payload); + /// # Ok(()) + /// # } + /// ``` pub async fn receive(&self) -> Result { use futures::StreamExt; @@ -204,24 +245,21 @@ impl AsyncConsumer { .into_inner(); // Get the first message from the stream - if let Some(msg) = stream.next().await { - let proto_msg = - msg.map_err(|e| Box::new(crate::error::TransientError::Status(e).into()))?; - - Ok(Message { + match stream.next().await { + Some(Ok(proto_msg)) => Ok(Message { payload: proto_msg.payload, metadata: proto_msg.metadata.into_iter().collect(), id: proto_msg.message_id, runtime: None, - }) - } else { - Err(Box::new( + }), + Some(Err(e)) => Err(Box::new(crate::error::TransientError::Status(e).into())), + None => Err(Box::new( crate::error::PermanentError::InvalidData { field: "subscribe".to_string(), - reason: "No message received from stream".to_string(), + reason: "Stream ended without producing a message".to_string(), } .into(), - )) + )), } } }