Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions clients/rust/prism-client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clients/rust/prism-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ tempfile = "3"
rand = "0.8"
chrono = "0.4"
env_logger = "0.11"
serde_json = "1.0"
base64 = "0.22"

[build-dependencies]
tonic-build = "0.11"
Expand Down
193 changes: 170 additions & 23 deletions clients/rust/prism-client/examples/canary.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
//! Prism Client Canary Test
//!
//! Continuously tests the Prism client SDK against a live proxy.
//! Uses a TTL-based key to determine runtime and crashes if tests fail.
//! Uses key/value as source of truth for pub/sub topic configuration.
//!
//! ## Behavior:
//! 1. Creates a TTL key ("canary:ttl") with random expiration (30-60s)
//! 2. Publishes messages once per second to "canary/heartbeat"
//! 3. Verifies each published message is received
//! 4. Exits gracefully when TTL key expires
//! 5. CRASHES if test runs 2x TTL time without key expiring (indicates problem)
//! 2. Stores pub/sub topic name in key/value ("canary:config:topic")
//! 3. Publishes messages with random data to the topic from KV
//! 4. Subscribes and validates received messages match published data
//! 5. Exits gracefully when TTL key expires
//! 6. CRASHES if test runs 2x TTL time without key expiring (indicates problem)
//! 7. CRASHES if message validation fails (payload mismatch)
//!
//! ## Environment Variables:
//! - `PRISM_ENDPOINT` - Proxy endpoint (default: localhost:8980)
Expand Down Expand Up @@ -39,10 +41,13 @@ impl CanaryConfig {
}
}

/// Statistics tracker for canary test execution
struct CanaryStats {
start_time: Instant,
messages_published: u64,
messages_received: u64,
messages_validated: u64,
validation_failures: u64,
errors: u64,
last_check_time: Instant,
}
Expand All @@ -54,6 +59,8 @@ impl CanaryStats {
start_time: now,
messages_published: 0,
messages_received: 0,
messages_validated: 0,
validation_failures: 0,
errors: 0,
last_check_time: now,
}
Expand All @@ -68,15 +75,45 @@ impl CanaryStats {
println!(" ⏱️ Uptime: {:.1}s", self.uptime().as_secs_f64());
println!(" 📤 Messages Published: {}", self.messages_published);
println!(" 📥 Messages Received: {}", self.messages_received);
println!(" ❌ Errors: {}", self.errors);
println!(" ✅ Messages Validated: {}", self.messages_validated);
println!(" ❌ Validation Failures: {}", self.validation_failures);
println!(" ⚠️ Errors: {}", self.errors);
if self.messages_published > 0 {
let success_rate =
(self.messages_received as f64 / self.messages_published as f64) * 100.0;
println!(" Success Rate: {:.1}%", success_rate);
(self.messages_validated as f64 / self.messages_published as f64) * 100.0;
println!(" 📈 Success Rate: {:.1}%", success_rate);
}
}
}

/// Represents a message we published and expect to receive
#[derive(Debug, Clone)]
struct PublishedMessage {
id: String,
payload: Vec<u8>,
timestamp: String,
published_at: Instant,
}

/// Generate random data for message payload
fn generate_random_data(rng: &mut rand::rngs::ThreadRng, size: usize) -> Vec<u8> {
use rand::Rng;
(0..size).map(|_| rng.gen::<u8>()).collect()
}

/// Create a JSON payload with message metadata
fn create_message_payload(msg: &PublishedMessage, random_data: &[u8]) -> Vec<u8> {
use std::collections::HashMap;
let mut data = HashMap::new();
data.insert("id", msg.id.clone());
data.insert("timestamp", msg.timestamp.clone());
data.insert(
"data",
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, random_data),
);
serde_json::to_vec(&data).unwrap_or_default()
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup logging
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
Expand Down Expand Up @@ -117,9 +154,25 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
kv_client.set(ttl_key, ttl_seconds.to_string().as_bytes())?;
println!("✅ TTL key created");

// Store pub/sub topic configuration in KV (source of truth)
let config_topic_key = "config:topic";
let heartbeat_topic = "heartbeat";
kv_client.set(config_topic_key, heartbeat_topic.as_bytes())?;
println!("\n📝 Stored topic configuration in KV:");
println!(" 🔑 Key: {}", config_topic_key);
println!(" 📢 Topic: {}", heartbeat_topic);

// Verify we can read it back
let stored_topic = kv_client
.get(config_topic_key)?
.ok_or("Failed to read topic from KV")?;
let topic_name = String::from_utf8(stored_topic)?;
println!(" ✅ Verified topic from KV: {}", topic_name);

let mut stats = CanaryStats::new();
let start_time = Instant::now();
let heartbeat_topic = "heartbeat";
let mut pending_messages: std::collections::HashMap<String, PublishedMessage> =
std::collections::HashMap::new();

println!(
"\n🔁 Starting canary loop (publishing every {}s)...\n",
Expand Down Expand Up @@ -170,40 +223,129 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

// Create unique message ID
// Read topic name from KV (demonstrating KV as source of truth)
let current_topic = match kv_client.get(config_topic_key) {
Ok(Some(topic_bytes)) => {
String::from_utf8(topic_bytes).unwrap_or_else(|_| topic_name.clone())
}
Ok(None) => {
eprintln!("⚠️ Topic configuration missing from KV, using default");
topic_name.clone()
}
Err(e) => {
eprintln!("⚠️ Error reading topic from KV: {}, using cached value", e);
stats.errors += 1;
topic_name.clone()
}
};

// Generate random message data
let message_id = format!("canary-{}-{}", elapsed.as_secs(), stats.messages_published);
let timestamp = chrono::Utc::now().to_rfc3339();
let data_size = rng.gen_range(64..=256);
let random_data = generate_random_data(&mut rng, data_size);

let published_msg = PublishedMessage {
id: message_id.clone(),
payload: random_data.clone(),
timestamp: timestamp.clone(),
published_at: Instant::now(),
};

let payload = create_message_payload(&published_msg, &random_data);

// Publish message
match producer.publish_with_metadata(
heartbeat_topic,
message_id.as_bytes(),
&current_topic,
&payload,
vec![
("message_id", &message_id),
("timestamp", &timestamp),
("uptime_secs", &elapsed.as_secs().to_string()),
("data_size", &random_data.len().to_string()),
],
) {
Ok(_) => {
stats.messages_published += 1;
print!("📤 Published: {} ", message_id);
pending_messages.insert(message_id.clone(), published_msg);
print!(
"📤 Published: {} ({} bytes) ",
message_id,
random_data.len()
);
}
Err(e) => {
stats.errors += 1;
eprintln!("❌ Publish failed: {}", e);
}
}

// Try to consume the message
// Try to consume and validate the message
match consumer.receive() {
Ok(message) => {
stats.messages_received += 1;
let received_id = String::from_utf8_lossy(&message.payload);
println!("📥 Received: {}", received_id);

// Acknowledge
if let Err(e) = message.ack() {
eprintln!("⚠️ Failed to ack message: {}", e);
// Try to parse the message
if let Ok(parsed) = serde_json::from_slice::<
std::collections::HashMap<String, String>,
>(&message.payload)
{
if let Some(received_id) = parsed.get("id") {
print!("📥 Received: {} ", received_id);

// Validate against pending messages
if let Some(expected_msg) = pending_messages.remove(received_id) {
// Validate payload
if let (Some(received_data_b64), Some(expected_ts)) =
(parsed.get("data"), parsed.get("timestamp"))
{
if let Ok(received_data) = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
received_data_b64,
) {
if received_data == expected_msg.payload
&& expected_ts == &expected_msg.timestamp
{
stats.messages_validated += 1;
let latency = expected_msg.published_at.elapsed();
println!(
"✅ Validated (latency: {:.1}ms)",
latency.as_millis()
);
} else {
stats.validation_failures += 1;
println!(
"❌ Validation FAILED: payload or timestamp mismatch"
);
if stats.validation_failures > 3 {
stats.print_summary();
eprintln!("\n💥 CRASH: Too many validation failures!");
std::process::exit(1);
}
Comment on lines +320 to +324
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number '3' for maximum validation failures should be extracted as a constant (e.g., const MAX_VALIDATION_FAILURES: u64 = 3) at the module level for better maintainability.

Copilot uses AI. Check for mistakes.
}
} else {
println!("⚠️ Failed to decode base64 data");
stats.errors += 1;
}
} else {
println!("⚠️ Missing data or timestamp in message");
stats.errors += 1;
}
} else {
println!("⚠️ Received unexpected message ID");
}

// Acknowledge
if let Err(e) = message.ack() {
eprintln!("⚠️ Failed to ack message: {}", e);
stats.errors += 1;
}
} else {
println!("⚠️ Message missing ID field");
stats.errors += 1;
}
} else {
println!("⚠️ Failed to parse message JSON");
stats.errors += 1;
}
}
Expand All @@ -213,14 +355,19 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

// Clean up old pending messages (> 10s old)
let now = Instant::now();
pending_messages
.retain(|_, msg| now.duration_since(msg.published_at) < Duration::from_secs(10));

// Print periodic status
#[allow(clippy::manual_is_multiple_of)]
if elapsed.as_secs() % 10 == 0 && stats.messages_published > 0 {
if elapsed.as_secs().is_multiple_of(10) && stats.messages_published > 0 {
let time_remaining = ttl_duration.saturating_sub(elapsed);
println!(
"\n📊 Status: {} published, {} received | TTL remaining: ~{}s\n",
"\n📊 Status: {} published, {} validated, {} pending | TTL remaining: ~{}s\n",
stats.messages_published,
stats.messages_received,
stats.messages_validated,
pending_messages.len(),
time_remaining.as_secs()
);
}
Expand Down
Loading