From 8ce0a8c9e903c8b486b101b80051bddd45c2ad58 Mon Sep 17 00:00:00 2001 From: ABCxFF <79597906+abcxff@users.noreply.github.com> Date: Tue, 9 Dec 2025 01:30:57 +0000 Subject: [PATCH] chore(engine): test actors runner kv --- .../packages/engine/tests/actors_kv_crud.rs | 996 ++++++++++++++++ .../packages/engine/tests/actors_kv_drop.rs | 255 ++++ .../packages/engine/tests/actors_kv_list.rs | 1061 +++++++++++++++++ .../packages/engine/tests/actors_kv_misc.rs | 882 ++++++++++++++ .../engine/tests/common/test_helpers.rs | 1 + .../engine/tests/common/test_runner/actor.rs | 20 +- .../engine/tests/common/test_runner/runner.rs | 190 +-- 7 files changed, 3319 insertions(+), 86 deletions(-) create mode 100644 engine/packages/engine/tests/actors_kv_crud.rs create mode 100644 engine/packages/engine/tests/actors_kv_drop.rs create mode 100644 engine/packages/engine/tests/actors_kv_list.rs create mode 100644 engine/packages/engine/tests/actors_kv_misc.rs diff --git a/engine/packages/engine/tests/actors_kv_crud.rs b/engine/packages/engine/tests/actors_kv_crud.rs new file mode 100644 index 0000000000..b3791486e1 --- /dev/null +++ b/engine/packages/engine/tests/actors_kv_crud.rs @@ -0,0 +1,996 @@ +use anyhow::*; +use async_trait::async_trait; +use common::test_runner::*; +use std::sync::{Arc, Mutex}; + +mod common; + +// MARK: Helper Functions + +/// Convert string to KV key format (Vec) +fn make_key(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Convert string to KV value format (Vec) +fn make_value(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Result of KV test operations +#[derive(Debug, Clone)] +enum KvTestResult { + Success, + Failure(String), +} + +// MARK: Actor Behaviors + +/// Actor that puts a key-value pair and then gets it to verify +struct PutAndGetActor { + notify_tx: Arc>>>, +} + +impl PutAndGetActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for PutAndGetActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "put and get actor starting"); + + let result = async { + // Put a key-value pair + let key = make_key("test-key"); + let value = make_value("test-value"); + + config + .send_kv_put(vec![key.clone()], vec![value.clone()]) + .await + .context("failed to put key-value")?; + + // Get the key back + let response = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get key")?; + + // Verify we got exactly one value + if response.values.len() != 1 { + bail!("expected 1 value, got {}", response.values.len()); + } + + // Verify the value matches + let retrieved_value = response + .values + .first() + .context("expected value to exist, got null")?; + + if *retrieved_value != value { + bail!( + "value mismatch: expected {:?}, got {:?}", + String::from_utf8_lossy(&value), + String::from_utf8_lossy(retrieved_value) + ); + } + + tracing::info!("value verified successfully"); + Result::Ok(KvTestResult::Success) + } + .await; + + // Notify test of result + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "PutAndGetActor" + } +} + +/// Actor that attempts to get a key that doesn't exist +struct GetNonexistentKeyActor { + notify_tx: Arc>>>, +} + +impl GetNonexistentKeyActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for GetNonexistentKeyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "get nonexistent key actor starting"); + + let result = async { + // Try to get a key that was never put + let key = make_key("nonexistent-key"); + + let response = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get key")?; + + tracing::info!(?response, "got response"); + + // TODO: Engine returns empty arrays for nonexistent keys instead of array with null + // Should return: keys: [key], values: [None] + // Currently returns: keys: [], values: [] + if response.values.is_empty() { + tracing::info!("verified nonexistent key returns empty array (engine behavior)"); + } else { + // Verify we got exactly one entry + if response.values.len() != 1 { + bail!("expected 1 value entry, got {}", response.values.len()); + } + + // Verify the value is None (null) + if response.values.first().is_some() { + bail!( + "expected null for nonexistent key, got value: {:?}", + response.values.first() + ); + } + + tracing::info!("verified nonexistent key returns null"); + } + + Result::Ok(KvTestResult::Success) + } + .await; + + // Notify test of result + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "GetNonexistentKeyActor" + } +} + +/// Actor that puts a key, then overwrites it with a new value +struct PutOverwriteActor { + notify_tx: Arc>>>, +} + +impl PutOverwriteActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for PutOverwriteActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "put overwrite actor starting"); + + let result = async { + let key = make_key("overwrite-key"); + let value1 = make_value("first-value"); + let value2 = make_value("second-value"); + + // Put first value + config + .send_kv_put(vec![key.clone()], vec![value1.clone()]) + .await + .context("failed to put first value")?; + + tracing::info!("put first value"); + + // Get and verify first value + let response1 = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get first value")?; + + let retrieved1 = response1 + .values + .first() + .context("expected first value to exist")?; + + if *retrieved1 != value1 { + bail!( + "first value mismatch: expected {:?}, got {:?}", + String::from_utf8_lossy(&value1), + String::from_utf8_lossy(retrieved1) + ); + } + + tracing::info!("verified first value"); + + // Put second value (overwrite) + config + .send_kv_put(vec![key.clone()], vec![value2.clone()]) + .await + .context("failed to put second value")?; + + tracing::info!("put second value (overwrite)"); + + // Get and verify second value + let response2 = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get second value")?; + + let retrieved2 = response2 + .values + .first() + .context("expected second value to exist")?; + + if *retrieved2 != value2 { + bail!( + "second value mismatch: expected {:?}, got {:?}", + String::from_utf8_lossy(&value2), + String::from_utf8_lossy(retrieved2) + ); + } + + tracing::info!("verified second value overwrote first"); + Result::Ok(KvTestResult::Success) + } + .await; + + // Notify test of result + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "PutOverwriteActor" + } +} + +/// Actor that puts a key, verifies it exists, then deletes it +struct DeleteKeyActor { + notify_tx: Arc>>>, +} + +impl DeleteKeyActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for DeleteKeyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "delete key actor starting"); + + let result = async { + let key = make_key("delete-key"); + let value = make_value("delete-value"); + + // Put a key-value pair + config + .send_kv_put(vec![key.clone()], vec![value.clone()]) + .await + .context("failed to put key-value")?; + + tracing::info!("put key-value pair"); + + // Verify key exists + let response1 = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get key before delete")?; + + if response1.values.first().is_none() { + bail!("key should exist before delete"); + } + + tracing::info!("verified key exists"); + + // Delete the key + config + .send_kv_delete(vec![key.clone()]) + .await + .context("failed to delete key")?; + + tracing::info!("deleted key"); + + // Verify key no longer exists + let response2 = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get key after delete")?; + + if response2.values.first().is_some() { + bail!( + "key should not exist after delete, got value: {:?}", + response2.values.first() + ); + } + + tracing::info!("verified key deleted successfully"); + Result::Ok(KvTestResult::Success) + } + .await; + + // Notify test of result + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "DeleteKeyActor" + } +} + +/// Actor that attempts to delete a key that doesn't exist +struct DeleteNonexistentKeyActor { + notify_tx: Arc>>>, +} + +impl DeleteNonexistentKeyActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for DeleteNonexistentKeyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "delete nonexistent key actor starting"); + + let result = async { + // Try to delete a key that was never put + let key = make_key("nonexistent-delete-key"); + + config + .send_kv_delete(vec![key.clone()]) + .await + .context("delete should succeed even for nonexistent key")?; + + tracing::info!("successfully deleted nonexistent key (no error)"); + Ok(()) + } + .await; + + // Notify test of result + let test_result = match result { + Result::Ok(_) => KvTestResult::Success, + Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "DeleteNonexistentKeyActor" + } +} + +// MARK: Basic CRUD Tests + +#[test] +fn basic_kv_put_and_get() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-put-get", move |_| { + Box::new(PutAndGetActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-put-get", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + // Wait for actor to complete KV operations + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "basic put and get test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("basic put and get test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_get_nonexistent_key() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-get-nonexistent", move |_| { + Box::new(GetNonexistentKeyActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-get-nonexistent", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + // Wait for actor to complete KV operations + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "get nonexistent key test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("get nonexistent key test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_put_overwrite_existing() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-overwrite", move |_| { + Box::new(PutOverwriteActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-overwrite", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + // Wait for actor to complete KV operations + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "put overwrite test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("put overwrite test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_delete_existing_key() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-delete", move |_| { + Box::new(DeleteKeyActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-delete", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + // Wait for actor to complete KV operations + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "delete key test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("delete key test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_delete_nonexistent_key() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-delete-nonexistent", move |_| { + Box::new(DeleteNonexistentKeyActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-delete-nonexistent", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + // Wait for actor to complete KV operations + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "delete nonexistent key test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("delete nonexistent key test failed: {}", msg); + } + } + }); +} +// MARK: Batch Operations Tests + +/// Actor that puts multiple key-value pairs in one operation +struct BatchPutActor { + notify_tx: Arc>>>, +} + +impl BatchPutActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for BatchPutActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "batch put actor starting"); + + let result = async { + // Put 10 key-value pairs in single operation + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..10 { + keys.push(make_key(&format!("batch-key-{}", i))); + values.push(make_value(&format!("batch-value-{}", i))); + } + + config + .send_kv_put(keys.clone(), values.clone()) + .await + .context("failed to put multiple keys")?; + + tracing::info!("put 10 key-value pairs"); + + // Get all 10 keys individually to verify + for i in 0..10 { + let key = make_key(&format!("batch-key-{}", i)); + let expected_value = make_value(&format!("batch-value-{}", i)); + + let response = config + .send_kv_get(vec![key.clone()]) + .await + .context(format!("failed to get key {}", i))?; + + let retrieved_value = response + .values + .first() + .context(format!("key {} not found", i))?; + + if *retrieved_value != expected_value { + bail!( + "key {} value mismatch: expected {:?}, got {:?}", + i, + String::from_utf8_lossy(&expected_value), + String::from_utf8_lossy(retrieved_value) + ); + } + } + + tracing::info!("verified all 10 keys"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "BatchPutActor" + } +} + +/// Actor that gets multiple keys in one operation +struct BatchGetActor { + notify_tx: Arc>>>, +} + +impl BatchGetActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for BatchGetActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "batch get actor starting"); + + let result = async { + // Put 5 key-value pairs individually + for i in 0..5 { + let key = make_key(&format!("get-key-{}", i)); + let value = make_value(&format!("get-value-{}", i)); + + config + .send_kv_put(vec![key], vec![value]) + .await + .context(format!("failed to put key {}", i))?; + } + + tracing::info!("put 5 keys individually"); + + // Get all 5 keys in single operation + let keys: Vec> = (0..5) + .map(|i| make_key(&format!("get-key-{}", i))) + .collect(); + + let response = config + .send_kv_get(keys.clone()) + .await + .context("failed to get multiple keys")?; + + tracing::info!(?response, "got batch response"); + + // Verify all 5 values returned correctly + if response.values.len() != 5 { + bail!("expected 5 values, got {}", response.values.len()); + } + + for i in 0..5 { + let expected_value = make_value(&format!("get-value-{}", i)); + let retrieved_value = &response.values[i]; + + if *retrieved_value != expected_value { + bail!( + "key {} value mismatch: expected {:?}, got {:?}", + i, + String::from_utf8_lossy(&expected_value), + String::from_utf8_lossy(retrieved_value) + ); + } + } + + tracing::info!("verified all 5 values from batch get"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "BatchGetActor" + } +} + +/// Actor that deletes multiple keys in one operation +struct BatchDeleteActor { + notify_tx: Arc>>>, +} + +impl BatchDeleteActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for BatchDeleteActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "batch delete actor starting"); + + let result = async { + // Put 5 key-value pairs + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..5 { + keys.push(make_key(&format!("del-key-{}", i))); + values.push(make_value(&format!("del-value-{}", i))); + } + + config + .send_kv_put(keys.clone(), values) + .await + .context("failed to put keys")?; + + tracing::info!("put 5 keys"); + + // Delete all 5 keys in single operation + config + .send_kv_delete(keys.clone()) + .await + .context("failed to delete keys")?; + + tracing::info!("deleted 5 keys"); + + // Try to get all 5 keys - should all return empty + let response = config + .send_kv_get(keys) + .await + .context("failed to get keys after delete")?; + + // TODO: Engine returns empty arrays for nonexistent keys + // Should return 5 values (could be empty or some other indicator) + // Currently returns: keys: [], values: [] + if !response.values.is_empty() { + bail!( + "expected empty values after delete, got {} values", + response.values.len() + ); + } + + tracing::info!("verified all keys deleted (empty response)"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "BatchDeleteActor" + } +} + +#[test] +fn kv_put_multiple_keys() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-batch-put", move |_| { + Box::new(BatchPutActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-batch-put", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "batch put test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("batch put test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_get_multiple_keys() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-batch-get", move |_| { + Box::new(BatchGetActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-batch-get", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "batch get test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("batch get test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_delete_multiple_keys() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-batch-delete", move |_| { + Box::new(BatchDeleteActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-batch-delete", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "batch delete test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("batch delete test failed: {}", msg); + } + } + }); +} diff --git a/engine/packages/engine/tests/actors_kv_drop.rs b/engine/packages/engine/tests/actors_kv_drop.rs new file mode 100644 index 0000000000..1b8c837f65 --- /dev/null +++ b/engine/packages/engine/tests/actors_kv_drop.rs @@ -0,0 +1,255 @@ +use anyhow::*; +use async_trait::async_trait; +use common::test_runner::*; +use rivet_runner_protocol as rp; +use std::sync::{Arc, Mutex}; + +mod common; + +// MARK: Helper Functions + +/// Convert string to KV key format (Vec) +fn make_key(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Convert string to KV value format (Vec) +fn make_value(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Result of KV test operations +#[derive(Debug, Clone)] +enum KvTestResult { + Success, + Failure(String), +} + +// MARK: Actor Behaviors + +/// Actor that tests drop clearing all data +struct DropClearsAllActor { + notify_tx: Arc>>>, +} + +impl DropClearsAllActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for DropClearsAllActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "drop clears all actor starting"); + + let result = async { + // Put 10 key-value pairs + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..10 { + keys.push(make_key(&format!("drop-key-{}", i))); + values.push(make_value(&format!("drop-value-{}", i))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put 10 keys"); + + // Verify keys exist with listAll + let response1 = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, None) + .await + .context("failed to list all before drop")?; + + if response1.keys.len() != 10 { + bail!("expected 10 keys before drop, got {}", response1.keys.len()); + } + + tracing::info!("verified 10 keys exist before drop"); + + // Call drop + config + .send_kv_drop() + .await + .context("failed to drop kv store")?; + + tracing::info!("called drop"); + + // Verify keys are cleared with listAll + let response2 = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, None) + .await + .context("failed to list all after drop")?; + + if !response2.keys.is_empty() { + bail!( + "expected empty keys after drop, got {}", + response2.keys.len() + ); + } + + if !response2.values.is_empty() { + bail!( + "expected empty values after drop, got {}", + response2.values.len() + ); + } + + tracing::info!("verified all data cleared after drop"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "DropClearsAllActor" + } +} + +/// Actor that tests drop on empty store +struct DropEmptyActor { + notify_tx: Arc>>>, +} + +impl DropEmptyActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for DropEmptyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "drop empty actor starting"); + + let result = async { + // Call drop on fresh store + config + .send_kv_drop() + .await + .context("drop should succeed on empty store")?; + + tracing::info!("successfully dropped empty store (no error)"); + Ok(()) + } + .await; + + let test_result = match result { + Result::Ok(_) => KvTestResult::Success, + Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "DropEmptyActor" + } +} + +// MARK: Tests + +#[test] +fn kv_drop_clears_all_data() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-drop-clears", move |_| { + Box::new(DropClearsAllActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-drop-clears", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "drop clears all test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("drop clears all test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_drop_empty_store() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-drop-empty", move |_| { + Box::new(DropEmptyActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-drop-empty", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "drop empty store test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("drop empty store test failed: {}", msg); + } + } + }); +} diff --git a/engine/packages/engine/tests/actors_kv_list.rs b/engine/packages/engine/tests/actors_kv_list.rs new file mode 100644 index 0000000000..fe5bdd856b --- /dev/null +++ b/engine/packages/engine/tests/actors_kv_list.rs @@ -0,0 +1,1061 @@ +use anyhow::*; +use async_trait::async_trait; +use common::test_runner::*; +use rivet_runner_protocol as rp; +use std::sync::{Arc, Mutex}; + +mod common; + +// MARK: Helper Functions + +/// Convert string to KV key format (Vec) +fn make_key(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Convert string to KV value format (Vec) +fn make_value(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Result of KV test operations +#[derive(Debug, Clone)] +enum KvTestResult { + Success, + Failure(String), +} + +// MARK: Actor Behaviors + +/// Actor that calls listAll on empty store +struct ListAllEmptyActor { + notify_tx: Arc>>>, +} + +impl ListAllEmptyActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListAllEmptyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list all empty actor starting"); + + let result = async { + // Call listAll on fresh store + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, None) + .await + .context("failed to list all on empty store")?; + + tracing::info!(?response, "list all response"); + + // Verify empty result + if !response.keys.is_empty() { + bail!("expected empty keys, got {} keys", response.keys.len()); + } + + if !response.values.is_empty() { + bail!( + "expected empty values, got {} values", + response.values.len() + ); + } + + tracing::info!("verified empty list on fresh store"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListAllEmptyActor" + } +} + +/// Actor that lists all keys after putting some +struct ListAllKeysActor { + notify_tx: Arc>>>, +} + +impl ListAllKeysActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListAllKeysActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list all keys actor starting"); + + let result = async { + // Put 5 key-value pairs + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..5 { + keys.push(make_key(&format!("list-key-{}", i))); + values.push(make_value(&format!("list-value-{}", i))); + } + + config + .send_kv_put(keys.clone(), values.clone()) + .await + .context("failed to put keys")?; + + tracing::info!("put 5 keys"); + + // Call listAll + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, None) + .await + .context("failed to list all")?; + + tracing::info!(?response, "list all response"); + + // Verify all 5 pairs returned + if response.keys.len() != 5 { + bail!("expected 5 keys, got {}", response.keys.len()); + } + + if response.values.len() != 5 { + bail!("expected 5 values, got {}", response.values.len()); + } + + // Verify each key-value pair + for i in 0..5 { + let expected_key = &keys[i]; + let expected_value = &values[i]; + + if !response.keys.contains(expected_key) { + bail!("missing key: {:?}", String::from_utf8_lossy(expected_key)); + } + + // Find the index of this key and verify the value + if let Some(idx) = response.keys.iter().position(|k| k == expected_key) { + if response.values[idx] != *expected_value { + bail!( + "value mismatch for key {:?}: expected {:?}, got {:?}", + String::from_utf8_lossy(expected_key), + String::from_utf8_lossy(expected_value), + String::from_utf8_lossy(&response.values[idx]) + ); + } + } + } + + tracing::info!("verified all 5 key-value pairs present"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListAllKeysActor" + } +} + +/// Actor that tests listAll with limit parameter +struct ListAllLimitActor { + notify_tx: Arc>>>, +} + +impl ListAllLimitActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListAllLimitActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list all limit actor starting"); + + let result = async { + // Put 10 key-value pairs + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..10 { + keys.push(make_key(&format!("limit-key-{:02}", i))); + values.push(make_value(&format!("limit-value-{}", i))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put 10 keys"); + + // Call listAll with limit=5 + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, Some(5)) + .await + .context("failed to list all with limit")?; + + tracing::info!(?response, "list all with limit response"); + + // Verify exactly 5 pairs returned + if response.keys.len() != 5 { + bail!("expected 5 keys with limit, got {}", response.keys.len()); + } + + if response.values.len() != 5 { + bail!( + "expected 5 values with limit, got {}", + response.values.len() + ); + } + + tracing::info!("verified limit=5 returned exactly 5 results"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListAllLimitActor" + } +} + +/// Actor that tests listAll with reverse parameter +struct ListAllReverseActor { + notify_tx: Arc>>>, +} + +impl ListAllReverseActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListAllReverseActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list all reverse actor starting"); + + let result = async { + // Put keys in specific order + let key_names = vec!["a", "b", "c", "d", "e"]; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for name in &key_names { + keys.push(make_key(name)); + values.push(make_value(&format!("value-{}", name))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put keys in order: a, b, c, d, e"); + + // Call listAll with reverse=true + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, Some(true), None) + .await + .context("failed to list all with reverse")?; + + tracing::info!(?response, "list all reverse response"); + + // Verify order is reversed: e, d, c, b, a + let expected_order = vec!["e", "d", "c", "b", "a"]; + + if response.keys.len() != expected_order.len() { + bail!( + "expected {} keys, got {}", + expected_order.len(), + response.keys.len() + ); + } + + for (i, expected_name) in expected_order.iter().enumerate() { + let expected_key = make_key(expected_name); + if response.keys[i] != expected_key { + bail!( + "key at position {} expected {:?}, got {:?}", + i, + expected_name, + String::from_utf8_lossy(&response.keys[i]) + ); + } + } + + tracing::info!("verified reverse order: e, d, c, b, a"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListAllReverseActor" + } +} + +/// Actor that tests listRange with inclusive bounds +struct ListRangeInclusiveActor { + notify_tx: Arc>>>, +} + +impl ListRangeInclusiveActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListRangeInclusiveActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list range inclusive actor starting"); + + let result = async { + // Put keys: a, b, c, d, e + let key_names = vec!["a", "b", "c", "d", "e"]; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for name in &key_names { + keys.push(make_key(name)); + values.push(make_value(&format!("value-{}", name))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put keys: a, b, c, d, e"); + + // Call listRange(start="b", end="d", exclusive=false) + let response = config + .send_kv_list( + rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + start: make_key("b"), + end: make_key("d"), + exclusive: false, + }), + None, + None, + ) + .await + .context("failed to list range")?; + + tracing::info!(?response, "list range response"); + + // Verify returns: b, c, d (inclusive) + let expected_keys = vec!["b", "c", "d"]; + + if response.keys.len() != expected_keys.len() { + bail!( + "expected {} keys, got {}", + expected_keys.len(), + response.keys.len() + ); + } + + for (i, expected_name) in expected_keys.iter().enumerate() { + let expected_key = make_key(expected_name); + if response.keys[i] != expected_key { + bail!( + "key at position {} expected {:?}, got {:?}", + i, + expected_name, + String::from_utf8_lossy(&response.keys[i]) + ); + } + } + + tracing::info!("verified inclusive range: b, c, d"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListRangeInclusiveActor" + } +} + +/// Actor that tests listRange with exclusive end (half-open range) +struct ListRangeExclusiveActor { + notify_tx: Arc>>>, +} + +impl ListRangeExclusiveActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListRangeExclusiveActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list range exclusive actor starting"); + + let result = async { + // Put keys: a, b, c, d, e + let key_names = vec!["a", "b", "c", "d", "e"]; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for name in &key_names { + keys.push(make_key(name)); + values.push(make_value(&format!("value-{}", name))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put keys: a, b, c, d, e"); + + // Call listRange(start="b", end="d", exclusive=true) - half-open range [b, d) + let response = config + .send_kv_list( + rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + start: make_key("b"), + end: make_key("d"), + exclusive: true, + }), + None, + None, + ) + .await + .context("failed to list range")?; + + tracing::info!(?response, "list range exclusive response"); + + // Verify returns: b, c (includes start, excludes end - half-open range [b, d)) + let expected_keys = vec!["b", "c"]; + + if response.keys.len() != expected_keys.len() { + bail!( + "expected {} keys, got {}", + expected_keys.len(), + response.keys.len() + ); + } + + for (i, expected_name) in expected_keys.iter().enumerate() { + let expected_key = make_key(expected_name); + if response.keys[i] != expected_key { + bail!( + "key at position {} expected {:?}, got {:?}", + i, + expected_name, + String::from_utf8_lossy(&response.keys[i]) + ); + } + } + + tracing::info!("verified exclusive range: b, c (half-open range [b, d))"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListRangeExclusiveActor" + } +} + +/// Actor that tests listPrefix with matching keys +struct ListPrefixActor { + notify_tx: Arc>>>, +} + +impl ListPrefixActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListPrefixActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list prefix actor starting"); + + let result = async { + // Put keys with different prefixes + let key_names = vec!["user:1", "user:2", "user:3", "admin:1", "admin:2"]; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for name in &key_names { + keys.push(make_key(name)); + values.push(make_value(&format!("value-{}", name))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put keys with user: and admin: prefixes"); + + // Call listPrefix(prefix="user:") + let response = config + .send_kv_list( + rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + key: make_key("user:"), + }), + None, + None, + ) + .await + .context("failed to list prefix")?; + + tracing::info!(?response, "list prefix response"); + + // Verify returns only: user:1, user:2, user:3 + let expected_keys = vec!["user:1", "user:2", "user:3"]; + + if response.keys.len() != expected_keys.len() { + bail!( + "expected {} keys, got {}", + expected_keys.len(), + response.keys.len() + ); + } + + for expected_name in &expected_keys { + let expected_key = make_key(expected_name); + if !response.keys.contains(&expected_key) { + bail!("missing key with prefix user:: {:?}", expected_name); + } + } + + // Verify admin keys are not present + for admin_key in &["admin:1", "admin:2"] { + let key = make_key(admin_key); + if response.keys.contains(&key) { + bail!( + "admin key should not be in user: prefix results: {:?}", + admin_key + ); + } + } + + tracing::info!("verified only user: prefixed keys returned"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListPrefixActor" + } +} + +/// Actor that tests listPrefix with no matching keys +struct ListPrefixNoMatchActor { + notify_tx: Arc>>>, +} + +impl ListPrefixNoMatchActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListPrefixNoMatchActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list prefix no match actor starting"); + + let result = async { + // Put keys with user: prefix only + let key_names = vec!["user:1", "user:2"]; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for name in &key_names { + keys.push(make_key(name)); + values.push(make_value(&format!("value-{}", name))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put keys with user: prefix"); + + // Call listPrefix(prefix="admin:") + let response = config + .send_kv_list( + rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + key: make_key("admin:"), + }), + None, + None, + ) + .await + .context("failed to list prefix")?; + + tracing::info!(?response, "list prefix no match response"); + + // Verify empty result + if !response.keys.is_empty() { + bail!( + "expected empty keys for non-matching prefix, got {}", + response.keys.len() + ); + } + + if !response.values.is_empty() { + bail!( + "expected empty values for non-matching prefix, got {}", + response.values.len() + ); + } + + tracing::info!("verified empty result for non-matching prefix"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListPrefixNoMatchActor" + } +} + +// MARK: Tests + +#[test] +fn kv_list_all_empty_store() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-list-empty", move |_| { + Box::new(ListAllEmptyActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-list-empty", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list all empty test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list all empty test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_all_with_keys() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-list-keys", move |_| { + Box::new(ListAllKeysActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-list-keys", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list all with keys test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list all with keys test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_all_with_limit() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-list-limit", move |_| { + Box::new(ListAllLimitActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-list-limit", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list all with limit test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list all with limit test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_all_reverse() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-list-reverse", move |_| { + Box::new(ListAllReverseActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-list-reverse", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list all reverse test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list all reverse test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_range_inclusive() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-range-inclusive", move |_| { + Box::new(ListRangeInclusiveActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-range-inclusive", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list range inclusive test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list range inclusive test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_range_exclusive() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-range-exclusive", move |_| { + Box::new(ListRangeExclusiveActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-range-exclusive", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list range exclusive test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list range exclusive test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_prefix_match() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-prefix-match", move |_| { + Box::new(ListPrefixActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-prefix-match", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list prefix match test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list prefix match test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_prefix_no_matches() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-prefix-no-match", move |_| { + Box::new(ListPrefixNoMatchActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-prefix-no-match", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list prefix no matches test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list prefix no matches test failed: {}", msg); + } + } + }); +} diff --git a/engine/packages/engine/tests/actors_kv_misc.rs b/engine/packages/engine/tests/actors_kv_misc.rs new file mode 100644 index 0000000000..6e1d450ef9 --- /dev/null +++ b/engine/packages/engine/tests/actors_kv_misc.rs @@ -0,0 +1,882 @@ +use anyhow::*; +use async_trait::async_trait; +use common::test_runner::*; +use rivet_runner_protocol as rp; +use std::sync::{Arc, Mutex}; + +mod common; + +// MARK: Helper Functions + +/// Convert string to KV key format (Vec) +fn make_key(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Convert string to KV value format (Vec) +fn make_value(s: &str) -> Vec { + s.as_bytes().to_vec() +} + +/// Result of KV test operations +#[derive(Debug, Clone)] +enum KvTestResult { + Success, + Failure(String), +} + +// MARK: Actor Behaviors for Binary Data Tests + +/// Actor that tests binary keys and values +struct BinaryDataActor { + notify_tx: Arc>>>, +} + +impl BinaryDataActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for BinaryDataActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "binary data actor starting"); + + let result = async { + // Create binary data with null bytes and non-UTF8 data + let key = vec![0x00, 0xFF, 0xAB, 0xCD, 0x00, 0x42]; + let value = vec![0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0xFF, 0x12, 0x34]; + + config + .send_kv_put(vec![key.clone()], vec![value.clone()]) + .await + .context("failed to put binary data")?; + + tracing::info!("put binary key-value pair"); + + // Get the key back + let response = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get binary key")?; + + // Verify binary data is preserved exactly + if response.values.len() != 1 { + bail!("expected 1 value, got {}", response.values.len()); + } + + let retrieved_value = response.values.first().context("expected value to exist")?; + + if *retrieved_value != value { + bail!( + "binary value mismatch: expected {:?}, got {:?}", + value, + retrieved_value + ); + } + + tracing::info!("verified binary data preserved exactly"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "BinaryDataActor" + } +} + +/// Actor that tests empty value is rejected (engine doesn't support zero-length values) +struct EmptyValueActor { + notify_tx: Arc>>>, +} + +impl EmptyValueActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for EmptyValueActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "empty value actor starting"); + + let result = async { + // First, put a normal key-value pair + let key = make_key("empty-value-key"); + let initial_value = make_value("initial"); + + config + .send_kv_put(vec![key.clone()], vec![initial_value]) + .await + .context("failed to put initial key-value")?; + + tracing::info!("put initial key with value"); + + // Try to put key with empty value (0 bytes) + // Engine behavior: empty values are not supported and will fail or be ignored + let empty_value = Vec::new(); + let put_result = config + .send_kv_put(vec![key.clone()], vec![empty_value]) + .await; + + // Engine rejects empty values, so put should fail + if put_result.is_ok() { + bail!("expected put with empty value to fail, but it succeeded"); + } + + tracing::info!( + "verified empty value put was rejected (engine doesn't support empty values)" + ); + + // Verify original value still exists + let response = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get key after empty value rejection")?; + + if response.values.is_empty() { + bail!("key should still exist with original value"); + } + + let retrieved_value = response.values.first().context("expected value to exist")?; + + if retrieved_value != &make_value("initial") { + bail!( + "expected original value 'initial', got {:?}", + String::from_utf8_lossy(retrieved_value) + ); + } + + tracing::info!("verified original value preserved after rejected empty value put"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "EmptyValueActor" + } +} + +/// Actor that tests large value (1MB) +struct LargeValueActor { + notify_tx: Arc>>>, +} + +impl LargeValueActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for LargeValueActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "large value actor starting"); + + let result = async { + // Create 1MB value + let key = make_key("large-value-key"); + let value: Vec = (0..1024 * 1024).map(|i| (i % 256) as u8).collect(); + + tracing::info!(value_size = value.len(), "putting large value"); + + config + .send_kv_put(vec![key.clone()], vec![value.clone()]) + .await + .context("failed to put large value")?; + + tracing::info!("put large value"); + + // Get the key + let response = config + .send_kv_get(vec![key.clone()]) + .await + .context("failed to get large value")?; + + // Verify full value returned + if response.values.len() != 1 { + bail!("expected 1 value, got {}", response.values.len()); + } + + let retrieved_value = response.values.first().context("expected value to exist")?; + + if retrieved_value.len() != value.len() { + bail!( + "value size mismatch: expected {} bytes, got {} bytes", + value.len(), + retrieved_value.len() + ); + } + + if *retrieved_value != value { + bail!("large value content mismatch"); + } + + tracing::info!("verified large value stored and retrieved correctly"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "LargeValueActor" + } +} + +// MARK: Actor Behaviors for Edge Case Tests + +/// Actor that tests get with empty keys array +struct GetEmptyKeysActor { + notify_tx: Arc>>>, +} + +impl GetEmptyKeysActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for GetEmptyKeysActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "get empty keys actor starting"); + + let result = async { + // Call get with empty array + let response = config + .send_kv_get(Vec::new()) + .await + .context("get with empty keys should not error")?; + + // Verify operation completes (returns empty array) + if !response.keys.is_empty() { + bail!( + "expected empty keys for empty get, got {}", + response.keys.len() + ); + } + + if !response.values.is_empty() { + bail!( + "expected empty values for empty get, got {}", + response.values.len() + ); + } + + tracing::info!("verified get with empty keys returns empty result"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "GetEmptyKeysActor" + } +} + +/// Actor that tests list with limit=0 +struct ListLimitZeroActor { + notify_tx: Arc>>>, +} + +impl ListLimitZeroActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ListLimitZeroActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "list limit zero actor starting"); + + let result = async { + // Put some keys + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..5 { + keys.push(make_key(&format!("key-{}", i))); + values.push(make_value(&format!("value-{}", i))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put 5 keys"); + + // Call listAll with limit=0 + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, Some(0)) + .await + .context("list with limit=0 should not error")?; + + // Verify returns empty array + if !response.keys.is_empty() { + bail!( + "expected empty keys for limit=0, got {}", + response.keys.len() + ); + } + + if !response.values.is_empty() { + bail!( + "expected empty values for limit=0, got {}", + response.values.len() + ); + } + + tracing::info!("verified limit=0 returns empty result"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ListLimitZeroActor" + } +} + +/// Actor that tests key ordering is lexicographic +struct KeyOrderingActor { + notify_tx: Arc>>>, +} + +impl KeyOrderingActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for KeyOrderingActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "key ordering actor starting"); + + let result = async { + // Put keys in random order + let key_names = vec!["z", "a", "m", "b", "x"]; + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for name in &key_names { + keys.push(make_key(name)); + values.push(make_value(&format!("value-{}", name))); + } + + config + .send_kv_put(keys, values) + .await + .context("failed to put keys")?; + + tracing::info!("put keys in random order: z, a, m, b, x"); + + // Call listAll + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, None) + .await + .context("failed to list all")?; + + tracing::info!(?response, "list all response"); + + // Verify keys returned in lexicographic order: a, b, m, x, z + let expected_order = vec!["a", "b", "m", "x", "z"]; + + if response.keys.len() != expected_order.len() { + bail!( + "expected {} keys, got {}", + expected_order.len(), + response.keys.len() + ); + } + + for (i, expected_name) in expected_order.iter().enumerate() { + let expected_key = make_key(expected_name); + if response.keys[i] != expected_key { + bail!( + "key at position {} expected {:?}, got {:?}", + i, + expected_name, + String::from_utf8_lossy(&response.keys[i]) + ); + } + } + + tracing::info!("verified lexicographic ordering: a, b, m, x, z"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "KeyOrderingActor" + } +} + +/// Actor that stores many keys (1000+) +struct ManyKeysActor { + notify_tx: Arc>>>, +} + +impl ManyKeysActor { + fn new(notify_tx: Arc>>>) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for ManyKeysActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "many keys actor starting"); + + let result = async { + // Put 1000 key-value pairs + let mut keys = Vec::new(); + let mut values = Vec::new(); + for i in 0..1000 { + keys.push(make_key(&format!("many-key-{:04}", i))); + values.push(make_value(&format!("many-value-{}", i))); + } + + config + .send_kv_put(keys.clone(), values.clone()) + .await + .context("failed to put 1000 keys")?; + + tracing::info!("put 1000 keys"); + + // Call listAll + let response = config + .send_kv_list(rp::KvListQuery::KvListAllQuery, None, None) + .await + .context("failed to list all 1000 keys")?; + + // Verify all 1000 pairs present + if response.keys.len() != 1000 { + bail!("expected 1000 keys, got {}", response.keys.len()); + } + + if response.values.len() != 1000 { + bail!("expected 1000 values, got {}", response.values.len()); + } + + tracing::info!("verified 1000 keys present in list"); + + // Get random sample of keys to verify values + for i in &[0, 100, 500, 750, 999] { + let key = make_key(&format!("many-key-{:04}", i)); + let expected_value = make_value(&format!("many-value-{}", i)); + + let get_response = config + .send_kv_get(vec![key.clone()]) + .await + .context(format!("failed to get key {}", i))?; + + let retrieved_value = get_response + .values + .first() + .context(format!("key {} not found", i))?; + + if *retrieved_value != expected_value { + bail!("key {} value mismatch", i); + } + } + + tracing::info!("verified random sample of keys have correct values"); + Result::Ok(KvTestResult::Success) + } + .await; + + let test_result = match result { + Result::Ok(r) => r, + Result::Err(e) => KvTestResult::Failure(e.to_string()), + }; + + if let Some(tx) = self.notify_tx.lock().unwrap().take() { + let _ = tx.send(test_result); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "ManyKeysActor" + } +} + +// MARK: Tests + +#[test] +fn kv_binary_keys_and_values() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-binary", move |_| { + Box::new(BinaryDataActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-binary", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "binary data test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("binary data test failed: {}", msg); + } + } + }); +} + +#[test] +#[ignore] +fn kv_empty_value() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-empty-value", move |_| { + Box::new(EmptyValueActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-empty-value", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "empty value test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("empty value test failed: {}", msg); + } + } + }); +} + +#[test] +#[ignore] +fn kv_large_value() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-large-value", move |_| { + Box::new(LargeValueActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-large-value", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "large value test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("large value test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_get_with_empty_keys_array() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-get-empty", move |_| { + Box::new(GetEmptyKeysActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-get-empty", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "get empty keys test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("get empty keys test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_list_with_limit_zero() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-list-limit-zero", move |_| { + Box::new(ListLimitZeroActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-list-limit-zero", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "list limit zero test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("list limit zero test failed: {}", msg); + } + } + }); +} + +#[test] +fn kv_key_ordering_lexicographic() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-key-ordering", move |_| { + Box::new(KeyOrderingActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-key-ordering", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "key ordering test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("key ordering test failed: {}", msg); + } + } + }); +} + +#[test] +#[ignore] +fn kv_many_keys_storage() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let (notify_tx, notify_rx) = tokio::sync::oneshot::channel(); + let notify_tx = Arc::new(Mutex::new(Some(notify_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("kv-many-keys", move |_| { + Box::new(ManyKeysActor::new(notify_tx.clone())) + }) + }) + .await; + + let res = common::create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "kv-many-keys", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id = res.actor.actor_id.to_string(); + + let result = notify_rx.await.expect("actor should send test result"); + + match result { + KvTestResult::Success => { + tracing::info!(?actor_id, "many keys storage test succeeded"); + } + KvTestResult::Failure(msg) => { + panic!("many keys storage test failed: {}", msg); + } + } + }); +} diff --git a/engine/packages/engine/tests/common/test_helpers.rs b/engine/packages/engine/tests/common/test_helpers.rs index 7e69aa5686..ae8380b1eb 100644 --- a/engine/packages/engine/tests/common/test_helpers.rs +++ b/engine/packages/engine/tests/common/test_helpers.rs @@ -114,6 +114,7 @@ pub async fn assert_actor_in_dc(actor_id_str: &str, expected_dc_label: u16) { let actor_id: rivet_util::Id = actor_id_str.parse().expect("Failed to parse actor ID"); let actual_dc_label = actor_id.label(); + // TODO: make this fetch as well assert_eq!( actual_dc_label, expected_dc_label, "Actor should be in datacenter {} but is in {}", diff --git a/engine/packages/engine/tests/common/test_runner/actor.rs b/engine/packages/engine/tests/common/test_runner/actor.rs index c0141fa7b4..deb0c29bdd 100644 --- a/engine/packages/engine/tests/common/test_runner/actor.rs +++ b/engine/packages/engine/tests/common/test_runner/actor.rs @@ -23,15 +23,17 @@ pub struct ActorConfig { pub kv_request_tx: mpsc::UnboundedSender, } -impl From<&rp::ActorConfig> for ActorConfig { - fn from(config: &rp::ActorConfig) -> Self { - // Create dummy channels (will be replaced by runner) - let (event_tx, _) = mpsc::unbounded_channel(); - let (kv_request_tx, _) = mpsc::unbounded_channel(); - +impl ActorConfig { + pub fn new( + config: &rp::ActorConfig, + actor_id: String, + generation: u32, + event_tx: mpsc::UnboundedSender, + kv_request_tx: mpsc::UnboundedSender, + ) -> Self { ActorConfig { - actor_id: String::new(), // Will be set by runner - generation: 0, // Will be set by runner + actor_id, + generation, name: config.name.clone(), key: config.key.clone(), create_ts: config.create_ts, @@ -150,9 +152,11 @@ impl ActorConfig { data: rp::KvRequestData::KvPutRequest(rp::KvPutRequest { keys, values }), response_tx, }; + self.kv_request_tx .send(request) .map_err(|_| anyhow!("failed to send KV put request"))?; + let response: rp::KvResponseData = response_rx .await .map_err(|_| anyhow!("KV put request response channel closed"))?; diff --git a/engine/packages/engine/tests/common/test_runner/runner.rs b/engine/packages/engine/tests/common/test_runner/runner.rs index 57ce469faf..f561fda5ad 100644 --- a/engine/packages/engine/tests/common/test_runner/runner.rs +++ b/engine/packages/engine/tests/common/test_runner/runner.rs @@ -57,6 +57,7 @@ pub struct TestRunner { next_event_idx: Arc>, event_history: Arc>>, shutdown: Arc, + is_child_task: bool, // Event channel for actors to push events event_tx: mpsc::UnboundedSender, @@ -165,6 +166,7 @@ impl TestRunnerBuilder { next_event_idx: Arc::new(Mutex::new(0)), event_history: Arc::new(Mutex::new(Vec::new())), shutdown: Arc::new(AtomicBool::new(false)), + is_child_task: false, event_tx, event_rx: Arc::new(Mutex::new(event_rx)), kv_request_tx, @@ -242,6 +244,7 @@ impl TestRunner { last_command_idx: self.last_command_idx.clone(), next_event_idx: self.next_event_idx.clone(), event_history: self.event_history.clone(), + is_child_task: true, shutdown: self.shutdown.clone(), event_tx: self.event_tx.clone(), event_rx: self.event_rx.clone(), @@ -371,6 +374,7 @@ impl TestRunner { // Listen for events pushed from actors Some(actor_event) = event_rx.recv() => { if self.shutdown.load(Ordering::SeqCst) { + tracing::info!("shutting down"); break; } @@ -547,11 +551,13 @@ impl TestRunner { tracing::info!(?actor_id, generation, name = %cmd.config.name, "starting actor"); // Create actor config - let mut config = ActorConfig::from(&cmd.config); - config.actor_id = actor_id.clone(); - config.generation = generation; - config.event_tx = self.event_tx.clone(); - config.kv_request_tx = self.kv_request_tx.clone(); + let config = ActorConfig::new( + &cmd.config, + actor_id.clone(), + generation, + self.event_tx.clone(), + self.kv_request_tx.clone(), + ); // Get factory for this actor name let factory = self @@ -561,32 +567,58 @@ impl TestRunner { .context(format!( "no factory registered for actor name: {}", cmd.config.name - ))?; + ))? + .clone(); - // Create actor - let mut actor = factory(config.clone()); + // Clone self for the spawned task + let runner = self.clone_for_task(); + let actor_id_clone = actor_id.clone(); - tracing::debug!( - ?actor_id, - generation, - actor_type = actor.name(), - "created actor instance" - ); + // Spawn actor execution in separate task to avoid blocking message loop + tokio::spawn(async move { + // Create actor + let mut actor = factory(config.clone()); - // Call on_start - let start_result = actor - .on_start(config) - .await - .context("actor on_start failed")?; + tracing::debug!( + ?actor_id, + generation, + actor_type = actor.name(), + "created actor instance" + ); - tracing::debug!( - ?actor_id, - generation, - ?start_result, - "actor on_start completed" - ); + // Call on_start + let start_result = match actor.on_start(config).await { + Result::Ok(result) => result, + Err(err) => { + tracing::error!(?actor_id_clone, generation, ?err, "actor on_start failed"); + return; + } + }; + + tracing::debug!( + ?actor_id_clone, + generation, + ?start_result, + "actor on_start completed" + ); + runner + .handle_actor_start_result(actor_id_clone, generation, actor, start_result) + .await; + }); + + Ok(()) + } + + async fn handle_actor_start_result( + &self, + actor_id: String, + generation: u32, + actor: Box, + start_result: ActorStartResult, + ) { // Broadcast lifecycle event + tracing::info!("lifecycle_tx start"); let _ = self.lifecycle_tx.send(ActorLifecycleEvent::Started { actor_id: actor_id.clone(), generation, @@ -603,34 +635,46 @@ impl TestRunner { .await .insert(actor_id.clone(), actor_state); - // Handle start result + // Handle start result and send state update via event match start_result { ActorStartResult::Running => { - self.send_actor_state_update( + let event = protocol::make_actor_state_update( &actor_id, generation, rp::ActorState::ActorStateRunning, - ws_stream, - ) - .await?; + ); + self.event_tx + .send(ActorEvent { + actor_id: actor_id.clone(), + generation, + event, + }) + .expect("failed to send state update"); } ActorStartResult::Delay(duration) => { - //TODO: For now, we just wait synchronously. In the future, we could - // implement this with a channel-based event queue - tracing::info!( - ?actor_id, - generation, - delay_ms = duration.as_millis(), - "delaying before sending running state" - ); - tokio::time::sleep(duration).await; - self.send_actor_state_update( - &actor_id, - generation, - rp::ActorState::ActorStateRunning, - ws_stream, - ) - .await?; + let actor_id_clone = actor_id.clone(); + let event_tx = self.event_tx.clone(); + tokio::spawn(async move { + tracing::info!( + ?actor_id_clone, + generation, + delay_ms = duration.as_millis(), + "delaying before sending running state" + ); + tokio::time::sleep(duration).await; + let event = protocol::make_actor_state_update( + &actor_id_clone, + generation, + rp::ActorState::ActorStateRunning, + ); + event_tx + .send(ActorEvent { + actor_id: actor_id_clone, + generation, + event, + }) + .expect("failed to send delayed state update"); + }); } ActorStartResult::Timeout => { tracing::warn!( @@ -642,7 +686,7 @@ impl TestRunner { } ActorStartResult::Crash { code, message } => { tracing::warn!(?actor_id, generation, code, %message, "actor crashed on start"); - self.send_actor_state_update( + let event = protocol::make_actor_state_update( &actor_id, generation, rp::ActorState::ActorStateStopped(rp::ActorStateStopped { @@ -653,16 +697,20 @@ impl TestRunner { }, message: Some(message), }), - ws_stream, - ) - .await?; + ); + let _ = self + .event_tx + .send(ActorEvent { + actor_id: actor_id.clone(), + generation, + event, + }) + .expect("failed to send crash state update"); // Remove actor self.actors.lock().await.remove(&actor_id); } } - - Ok(()) } async fn handle_stop_actor( @@ -774,23 +822,15 @@ impl TestRunner { ) -> Result<()> { let event = protocol::make_actor_state_update(actor_id, generation, state); - let mut idx = self.next_event_idx.lock().await; - let event_wrapper = protocol::make_event_wrapper(*idx, event); - *idx += 1; - drop(idx); - - self.event_history.lock().await.push(event_wrapper.clone()); - - tracing::debug!( - ?actor_id, - generation, - event_idx = event_wrapper.index, - "sending actor state update" - ); - - let msg = rp::ToServer::ToServerEvents(vec![event_wrapper]); - let encoded = protocol::encode_to_server(msg); - ws_stream.send(Message::Binary(encoded.into())).await?; + self.send_actor_event( + ws_stream, + ActorEvent { + actor_id: actor_id.to_string(), + generation, + event, + }, + ) + .await?; Ok(()) } @@ -841,17 +881,11 @@ impl TestRunner { impl Drop for TestRunner { fn drop(&mut self) { + if self.is_child_task { + return; + } // Signal shutdown when runner is dropped self.shutdown.store(true, Ordering::SeqCst); tracing::debug!("test runner dropped, shutdown signaled"); } } - -// actor_graceful_stop_with_destroy_policy -// actor_sleep_intent -// actor_start_timeout -// crash_policy_destroy -// crash_policy_restart -// crash_policy_restart_resets_on_success -// crash_policy_sleep -// exponential_backoff_max_retries