diff --git a/Cargo.lock b/Cargo.lock index 7f99f65fda..66593a8964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4448,6 +4448,7 @@ name = "rivet-engine" version = "2.0.25" dependencies = [ "anyhow", + "async-trait", "axum 0.8.4", "base64 0.22.1", "chrono", diff --git a/engine/packages/engine/Cargo.toml b/engine/packages/engine/Cargo.toml index 072920a560..2ac3538bc4 100644 --- a/engine/packages/engine/Cargo.toml +++ b/engine/packages/engine/Cargo.toml @@ -54,6 +54,7 @@ url.workspace = true uuid.workspace = true [dev-dependencies] +async-trait.workspace = true axum.workspace = true base64.workspace = true chrono.workspace = true diff --git a/engine/packages/engine/tests/actors_alarms.rs b/engine/packages/engine/tests/actors_alarms.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/engine/packages/engine/tests/actors_alarms.rs @@ -0,0 +1 @@ + diff --git a/engine/packages/engine/tests/actors_lifecycle.rs b/engine/packages/engine/tests/actors_lifecycle.rs index eb4937e27b..2bde5b17f4 100644 --- a/engine/packages/engine/tests/actors_lifecycle.rs +++ b/engine/packages/engine/tests/actors_lifecycle.rs @@ -1,158 +1,101 @@ -mod common; - -// MARK: 1. Creation and Initialization -#[test] -fn create_actor_basic() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; - - let res = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create actor"); - - let actor_id = res.actor.actor_id.to_string(); - - // Verify response contains valid actor_id - assert!(!actor_id.is_empty(), "actor_id should not be empty"); +use std::sync::{Arc, Mutex}; - // Verify actor exists and retrieve it - let actor = - common::assert_actor_exists(ctx.leader_dc().guard_port(), &actor_id, &namespace).await; - - // Verify create_ts is set - assert!( - actor.create_ts > 0, - "create_ts should be set to a positive timestamp" - ); +mod common; - tracing::info!( - ?actor_id, - create_ts = actor.create_ts, - "actor created successfully" - ); - }); +async fn create_actor( + port: u16, + namespace: &str, + name: &str, + runner_name: &str, + crash_policy: rivet_types::actors::CrashPolicy, +) -> common::api_types::actors::create::CreateResponse { + common::api::public::actors_create( + port, + common::api_types::actors::create::CreateQuery { + namespace: namespace.to_string(), + }, + common::api_types::actors::create::CreateRequest { + datacenter: None, + name: name.to_string(), + key: None, + input: None, + runner_name_selector: runner_name.to_string(), + crash_policy, + }, + ) + .await + .expect("failed to create actor") } +// MARK: Creation and Initialization #[test] -fn create_actor_with_key() { +fn actor_basic_create() { common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - let key = common::generate_unique_key(); + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + let start_tx = Arc::new(Mutex::new(Some(start_tx))); - // Step 1 & 2: Create actor with unique key - let res = common::api::public::actors_create( + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::NotifyOnStartActor::new( + start_tx.clone(), + )) + }) + }) + .await; + + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; let actor_id = res.actor.actor_id.to_string(); - // Verify actor created successfully - assert!(!actor_id.is_empty(), "actor_id should not be empty"); - - // Step 3: Verify key is reserved by checking actor exists with the key - let actor = - common::assert_actor_exists(ctx.leader_dc().guard_port(), &actor_id, &namespace).await; - assert_eq!( - actor.key, - Some(key.clone()), - "actor should have the specified key" - ); - - tracing::info!(?actor_id, ?key, "first actor created with key"); - - // Step 4: Attempt to create second actor with same key AND same name - // Note: The key uniqueness constraint is scoped by (namespace_id, name, key) - let res2 = common::api::public::build_actors_create_request( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), // Same name as first actor - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to build request") - .send() - .await - .expect("failed to send request"); + // Wait for actor to start (notification from actor) + start_rx + .await + .expect("actor should have sent start notification"); - // Step 5: Verify second creation fails with key conflict error - // First check that it's an error response + // Verify actor is allocated to runner assert!( - !res2.status().is_success(), - "Expected error response, got success: {}", - res2.status() - ); - - // Parse the JSON body - let body: serde_json::Value = res2.json().await.expect("Failed to parse error response"); - - // Check the error code (error is at root level, not under "error" key) - let error_code = body["code"] - .as_str() - .expect("Missing error code in response"); - assert_eq!( - error_code, "duplicate_key", - "Expected duplicate_key error, got {}", - error_code - ); - - // Verify metadata contains the existing actor ID - let existing_actor_id = body["metadata"]["existing_actor_id"] - .as_str() - .expect("Missing existing_actor_id in metadata"); - assert_eq!( - existing_actor_id, &actor_id, - "Expected existing_actor_id to match first actor" + runner.has_actor(&actor_id).await, + "runner should have the actor allocated" ); - tracing::info!(?key, "key conflict properly detected"); + tracing::info!(?actor_id, runner_id = ?runner.runner_id, "actor allocated to runner"); }); } #[test] fn create_actor_with_input() { common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - // Step 1: Create actor with input data + // Generate test input data (base64-encoded String) let input_data = common::generate_test_input_data(); + + // Decode the base64 data to get the actual bytes the actor will receive + // The API automatically decodes base64 input before sending to the runner + let input_data_bytes = + base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &input_data) + .expect("failed to decode base64 input"); + + // Create runner with VerifyInputActor that will validate the input + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::VerifyInputActor::new( + input_data_bytes.clone(), + )) + }) + }) + .await; + + // Create actor with input data let res = common::api::public::actors_create( ctx.leader_dc().guard_port(), common::api_types::actors::create::CreateQuery { @@ -163,7 +106,7 @@ fn create_actor_with_input() { name: "test-actor".to_string(), key: None, input: Some(input_data.clone()), - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), + runner_name_selector: runner.name().to_string(), crash_policy: rivet_types::actors::CrashPolicy::Destroy, }, ) @@ -172,235 +115,308 @@ fn create_actor_with_input() { let actor_id = res.actor.actor_id.to_string(); - // Step 2 & 3: Verify actor receives input correctly - assert!(!actor_id.is_empty(), "actor_id should not be empty"); + // Poll for actor to become connectable + // If input verification fails, the actor will crash and never become connectable + let actor = loop { + let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + // Check if actor crashed (input verification failed) + if actor.destroy_ts.is_some() { + panic!( + "actor crashed during input verification (input data was not received correctly)" + ); + } + + // Check if actor is connectable (input verification succeeded) + if actor.connectable_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; - // Verify actor exists - let _actor = - common::assert_actor_exists(ctx.leader_dc().guard_port(), &actor_id, &namespace).await; + assert!( + actor.connectable_ts.is_some(), + "actor should be connectable after successful input verification" + ); - // Note: The input data is passed to the runner, and the actor should have access to it - // The actual verification that the actor received the input would typically be done - // by querying the actor via Guard and checking its response, but for this basic test - // we verify the actor was created successfully tracing::info!( ?actor_id, input_size = input_data.len(), - "actor created with input data" + "actor successfully verified input data" ); }); } -// MARK: 2. Allocation and Starting #[test] -fn actor_allocation_to_runner() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; +fn actor_start_timeout() { + // This test takes 35+ seconds + common::run( + common::TestOpts::new_with_timeout(1, 45), + |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + // Create test runner with timeout actor behavior + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("timeout-actor", move |_| { + Box::new(common::test_runner::TimeoutActor::new()) + }) + }) + .await; + + tracing::info!("test runner ready, creating actor that will timeout"); + + // Create actor with destroy crash policy + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "timeout-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; - let res = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create actor"); + let actor_id_str = res.actor.actor_id.to_string(); - let actor_id = res.actor.actor_id.to_string(); + tracing::info!(?actor_id_str, "actor created, waiting for timeout"); - // Verify actor is allocated to runner - assert!( - runner.has_actor(&actor_id).await, - "runner should have the actor allocated" - ); + // Wait for the actor start timeout threshold (30s + buffer) + tokio::time::sleep(tokio::time::Duration::from_secs(35)).await; - tracing::info!(?actor_id, runner_id = ?runner.runner_id, "actor allocated to runner"); - }); + // Verify actor was marked as destroyed due to timeout + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + assert!( + actor.destroy_ts.is_some(), + "actor should be destroyed after start timeout" + ); + + tracing::info!(?actor_id_str, "actor correctly destroyed after timeout"); + }, + ); } +// MARK: Running State Management #[test] -fn actor_starts_and_becomes_connectable() { +fn actor_starts_and_connectable_via_guard_http() { common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - let res = common::api::public::actors_create( + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + let start_tx = Arc::new(Mutex::new(Some(start_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::NotifyOnStartActor::new( + start_tx.clone(), + )) + }) + }) + .await; + + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; let actor_id = res.actor.actor_id.to_string(); // Wait for actor to start - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - // Verify actor is connectable - let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace) + start_rx .await - .expect("failed to get actor") - .expect("actor should exist"); + .expect("actor should have sent start notification"); + + // Poll for connectable_ts to be set + let actor = loop { + let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.connectable_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; assert!( actor.connectable_ts.is_some(), - "connectable_ts should be set" + "actor should be connectable" ); - assert!(actor.start_ts.is_some(), "start_ts should be set"); - // Test ping via guard - let ping_response = common::ping_actor_via_guard(ctx.leader_dc(), &actor_id).await; - assert_eq!(ping_response["status"], "ok"); + // TODO: HTTP ping test via guard needs to be implemented: the Rust test runner atm + // doesn't implement HTTP tunneling yet. The original test with TypeScript + // runner included: common::ping_actor_via_guard(ctx.leader_dc(), &actor_id).await; - tracing::info!(?actor_id, "actor is connectable and responding"); + tracing::info!(?actor_id, "actor is connectable (state verified)"); }); } #[test] -#[ignore] -fn actor_start_timeout() { - // TODO: Implement when we have a way to simulate actors that don't start -} - -// MARK: 3. Running State Management -#[test] -fn actor_connectable_via_guard_http() { +fn actor_connectable_via_guard_websocket() { common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - let res = common::api::public::actors_create( + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + let start_tx = Arc::new(Mutex::new(Some(start_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::NotifyOnStartActor::new( + start_tx.clone(), + )) + }) + }) + .await; + + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; let actor_id = res.actor.actor_id.to_string(); - // Wait for actor to become connectable - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + // Wait for actor to start + start_rx + .await + .expect("actor should have sent start notification"); + + // Poll for connectable_ts to be set + let actor = loop { + let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.connectable_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + }; - // Send HTTP request via Guard - let response = common::ping_actor_via_guard(ctx.leader_dc(), &actor_id).await; + assert!( + actor.connectable_ts.is_some(), + "actor should be connectable" + ); - // Verify response - assert_eq!(response["status"], "ok"); + // Note: WebSocket ping test via guard is skipped because the Rust test runner + // doesn't implement HTTP tunneling yet. The original test with TypeScript + // runner included: common::ping_actor_websocket_via_guard(ctx.leader_dc(), &actor_id).await; - tracing::info!(?actor_id, "actor successfully responded via guard HTTP"); + tracing::info!(?actor_id, "actor is connectable (state verified)"); }); } +// MARK: Stopping and Graceful Shutdown #[test] -fn actor_connectable_via_guard_websocket() { +fn actor_graceful_stop_with_destroy_policy() { common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - let res = common::api::public::actors_create( + // Create test runner with stop immediately actor + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("stop-actor", move |_| { + Box::new(common::test_runner::StopImmediatelyActor::new()) + }) + }) + .await; + + tracing::info!("test runner ready, creating actor that will stop gracefully"); + + // Create actor with destroy crash policy + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "stop-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; - let actor_id = res.actor.actor_id.to_string(); + let actor_id_str = res.actor.actor_id.to_string(); - // Wait for actor to become connectable - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + tracing::info!(?actor_id_str, "actor created, will send stop intent"); - // Test WebSocket connection - let response = common::ping_actor_websocket_via_guard(ctx.leader_dc(), &actor_id).await; + // Poll for actor to be destroyed after graceful stop + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); - // Verify response - assert_eq!(response["status"], "ok"); + if actor.destroy_ts.is_some() { + break actor; + } - tracing::info!( - ?actor_id, - "actor successfully responded via guard WebSocket" + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + + assert!( + actor.destroy_ts.is_some(), + "actor should be destroyed after graceful stop with destroy policy" ); - }); -} -#[test] -#[ignore] -fn actor_alarm_wake() { - // TODO: Implement when test runner supports alarms -} + // Verify runner slot freed (actor no longer on runner) + assert!( + !runner.has_actor(&actor_id_str).await, + "actor should be removed from runner after destroy" + ); -// MARK: 4. Stopping and Graceful Shutdown -#[test] -#[ignore] -fn actor_graceful_stop_with_destroy_policy() { - // TODO: Implement when we can control actor stop behavior + tracing::info!(?actor_id_str, "actor gracefully stopped and destroyed"); + }); } #[test] fn actor_explicit_destroy() { common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - let res = common::api::public::actors_create( + // Create a channel to be notified when the actor starts + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + let start_tx = Arc::new(Mutex::new(Some(start_tx))); + + // Build a custom runner with NotifyOnStartActor + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::NotifyOnStartActor::new( + start_tx.clone(), + )) + }) + }) + .await; + + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; let actor_id = res.actor.actor_id.to_string(); - // Wait for actor to start - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + start_rx + .await + .expect("actor should have sent start notification"); // Verify actor is running assert!( @@ -421,14 +437,24 @@ fn actor_explicit_destroy() { .await .expect("failed to delete actor"); - // Wait for destroy to propagate - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + // Poll for actor to be destroyed or timeout after 5s + let start = std::time::Instant::now(); + let actor = loop { + let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace) + .await + .expect("failed to get actor") + .expect("actor should still exist in database"); + + if actor.destroy_ts.is_some() { + break actor; + } - // Verify actor is destroyed - let actor = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id, &namespace) - .await - .expect("failed to get actor") - .expect("actor should still exist in database"); + if start.elapsed() > std::time::Duration::from_secs(5) { + panic!("actor deletion timed out after 5 seconds"); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; assert!( actor.destroy_ts.is_some(), @@ -441,73 +467,400 @@ fn actor_explicit_destroy() { // MARK: 5. Crash Handling and Policies #[test] -#[ignore] fn crash_policy_restart() { - // TODO: Implement when we can simulate actor crashes -} + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; -#[test] -#[ignore] -fn crash_policy_restart_resets_on_success() { - // TODO: Implement when we can simulate actor crashes and recovery -} + // Create channel to be notified when actor crashes + let (crash_tx, crash_rx) = tokio::sync::oneshot::channel(); + let crash_tx = Arc::new(Mutex::new(Some(crash_tx))); + + // Create test runner with crashing actor + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("crash-actor", move |_| { + Box::new(common::test_runner::CrashOnStartActor::new_with_notify( + 1, + crash_tx.clone(), + )) + }) + }) + .await; + + tracing::info!("test runner ready, creating actor with restart policy"); + + // Create actor with restart crash policy + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "crash-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Restart, + ) + .await; + + let actor_id_str = res.actor.actor_id.to_string(); + + tracing::info!(?actor_id_str, "actor created, will crash on start"); + + // Wait for crash notification + crash_rx + .await + .expect("actor should have sent crash notification"); + + // Poll for reschedule_ts to be set (system needs to process the crash) + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.reschedule_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + + assert!( + actor.reschedule_ts.is_some(), + "actor should have reschedule_ts after crash with restart policy" + ); + + tracing::info!(?actor_id_str, reschedule_ts = ?actor.reschedule_ts, "actor scheduled for restart"); + }); +} + +#[test] +fn crash_policy_restart_resets_on_success() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let crash_count = Arc::new(Mutex::new(0)); + + // Create test runner with actor that crashes 2 times then succeeds + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("crash-recover-actor", move |_| { + Box::new(common::test_runner::CrashNTimesThenSucceedActor::new( + 2, + crash_count.clone(), + )) + }) + }) + .await; + + tracing::info!("test runner ready, creating actor with restart policy"); + + // Create actor with restart crash policy + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "crash-recover-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Restart, + ) + .await; + + let actor_id_str = res.actor.actor_id.to_string(); + + tracing::info!( + ?actor_id_str, + "actor created, will crash twice then succeed" + ); + + // Poll for actor to eventually become connectable after crashes and restarts + // The actor should crash twice, reschedule, and eventually run successfully + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + // Actor successfully running after retries + if actor.connectable_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + }; + + assert!( + actor.connectable_ts.is_some(), + "actor should eventually become connectable after crashes" + ); + // actor.reschedule_ts is always Some(), not sure if this is intended + assert!( + actor.reschedule_ts.is_none() + || (actor.connectable_ts.unwrap() > actor.reschedule_ts.unwrap()), + "actor should not be scheduled for retry when running successfully" + ); + + tracing::info!(?actor_id_str, "actor successfully recovered after crashes"); + }); +} #[test] -#[ignore] fn crash_policy_sleep() { - // TODO: Implement when we can simulate actor crashes + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + // Create channel to be notified when actor crashes + let (crash_tx, crash_rx) = tokio::sync::oneshot::channel(); + let crash_tx = Arc::new(Mutex::new(Some(crash_tx))); + + // Create test runner with crashing actor + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("crash-actor", move |_| { + Box::new(common::test_runner::CrashOnStartActor::new_with_notify( + 1, + crash_tx.clone(), + )) + }) + }) + .await; + + tracing::info!("test runner ready, creating actor with sleep policy"); + + // Create actor with sleep crash policy + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "crash-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Sleep, + ) + .await; + + let actor_id_str = res.actor.actor_id.to_string(); + + tracing::info!(?actor_id_str, "actor created with sleep policy"); + + // Wait for crash notification + crash_rx + .await + .expect("actor should have sent crash notification"); + + // Poll for sleep_ts to be set (system needs to process the crash) + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.sleep_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + + assert!( + actor.sleep_ts.is_some(), + "actor should be sleeping after crash with sleep policy" + ); + assert!( + actor.connectable_ts.is_none(), + "actor should not be connectable while sleeping" + ); + + tracing::info!( + ?actor_id_str, + "actor correctly entered sleep state after crash" + ); + }); } #[test] -#[ignore] fn crash_policy_destroy() { - // TODO: Implement when we can simulate actor crashes + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + // Create channel to be notified when actor crashes + let (crash_tx, crash_rx) = tokio::sync::oneshot::channel(); + let crash_tx = Arc::new(Mutex::new(Some(crash_tx))); + + // Create test runner with crashing actor + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("crash-actor", move |_| { + Box::new(common::test_runner::CrashOnStartActor::new_with_notify( + 1, + crash_tx.clone(), + )) + }) + }) + .await; + + tracing::info!("test runner ready, creating actor with destroy policy"); + + // Create actor with destroy crash policy + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "crash-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id_str = res.actor.actor_id.to_string(); + + tracing::info!(?actor_id_str, "actor created with destroy policy"); + + // Wait for crash notification + crash_rx + .await + .expect("actor should have sent crash notification"); + + // Poll for destroy_ts to be set (system needs to process the crash) + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.destroy_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + + assert!( + actor.destroy_ts.is_some(), + "actor should be destroyed after crash with destroy policy" + ); + + tracing::info!(?actor_id_str, "actor correctly destroyed after crash"); + }); } // MARK: 6. Sleep and Wake #[test] -#[ignore] fn actor_sleep_intent() { - // TODO: Implement when test runner supports sleep intents -} + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; -#[test] -#[ignore] -fn actor_wake_from_sleep() { - // TODO: Implement when we can test sleep/wake cycle -} + // Create channel to be notified when actor sends sleep intent + let (sleep_tx, sleep_rx) = tokio::sync::oneshot::channel(); + let sleep_tx = Arc::new(Mutex::new(Some(sleep_tx))); + + // Create test runner with sleep actor behavior + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("sleep-actor", move |_| { + Box::new(common::test_runner::SleepImmediatelyActor::new_with_notify( + sleep_tx.clone(), + )) + }) + }) + .await; -#[test] -#[ignore] -fn actor_sleep_with_deferred_wake() { - // TODO: Implement when we have fine-grained sleep/wake control + tracing::info!("test runner ready, creating actor that will sleep"); + + // Create actor + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "sleep-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; + + let actor_id_str = res.actor.actor_id.to_string(); + + tracing::info!(?actor_id_str, "actor created, will send sleep intent"); + + // Wait for sleep intent notification + sleep_rx + .await + .expect("actor should have sent sleep intent notification"); + + // Poll for sleep_ts to be set (system needs to process the sleep intent) + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.sleep_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + + assert!( + actor.sleep_ts.is_some(), + "actor should have sleep_ts after sending sleep intent" + ); + assert!( + actor.connectable_ts.is_none(), + "actor should not be connectable while sleeping" + ); + + tracing::info!(?actor_id_str, "actor correctly entered sleep state"); + }); } -// MARK: 7. Pending Allocation Queue +// MARK: Pending Allocation Queue #[test] -#[ignore] fn actor_pending_allocation_no_runners() { common::run(common::TestOpts::new(1), |ctx| async move { - // Create namespace without runner + // Create namespace and start a runner with 1 slot let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - // Create actor (should be pending) - let res = common::api::public::actors_create( + let runner_full = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder + .with_total_slots(1) + .with_actor_behavior("filler-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + }) + .await; + + tracing::info!("runner with 1 slot started"); + + // Fill the slot with a filler actor + let filler_res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "filler-actor", + runner_full.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; + + let filler_actor_id = filler_res.actor.actor_id.to_string(); + + // Wait for filler actor to be allocated + loop { + if runner_full.has_actor(&filler_actor_id).await { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } + + tracing::info!( + ?filler_actor_id, + "filler actor allocated, runner is now full" + ); + + // Create test actor (should be pending because runner is full) + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "test-actor", + runner_full.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; let actor_id = res.actor.actor_id.to_string(); @@ -528,19 +881,23 @@ fn actor_pending_allocation_no_runners() { tracing::info!(?actor_id, "actor is pending allocation"); - // Now start a runner - let runner = common::setup_runner( - ctx.leader_dc(), - &namespace, - &format!("key-{:012x}", rand::random::()), - 1, - 20, - None, - ) + // Now start a runner with available slots + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + }) .await; - // Wait for allocation - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + tracing::info!("runner with 20 slots started"); + + // Poll for allocation + loop { + if runner.has_actor(&actor_id).await { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } // Verify actor is now allocated assert!( @@ -550,37 +907,47 @@ fn actor_pending_allocation_no_runners() { tracing::info!( ?actor_id, - "actor successfully allocated after runner started" + "actor successfully allocated after runner with slots started" ); }); } #[test] -#[ignore] fn pending_allocation_queue_ordering() { common::run(common::TestOpts::new(1), |ctx| async move { - // Create namespace without runner + // Create namespace and start runner with only 2 slots let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder + .with_total_slots(2) + .with_actor_behavior("test-actor-0", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .with_actor_behavior("test-actor-1", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .with_actor_behavior("test-actor-2", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + }) + .await; + + tracing::info!("runner with 2 slots started"); + // Create 3 actors in sequence + // First 2 should be allocated immediately, 3rd should be pending let mut actor_ids = Vec::new(); for i in 0..3 { - let res = common::api::public::actors_create( + let name = format!("test-actor-{}", i); + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: format!("test-actor-{}", i), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + &name, + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - .expect("failed to create actor"); + .await; actor_ids.push(res.actor.actor_id.to_string()); @@ -588,19 +955,17 @@ fn pending_allocation_queue_ordering() { tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } - // Start runner with only 2 slots - let runner = common::setup_runner( - ctx.leader_dc(), - &namespace, - &format!("key-{:012x}", rand::random::()), - 1, - 2, // Only 2 slots - None, - ) - .await; + // Poll for first 2 actors to be allocated + loop { + let has_0 = runner.has_actor(&actor_ids[0]).await; + let has_1 = runner.has_actor(&actor_ids[1]).await; + + if has_0 && has_1 { + break; + } - // Wait for allocation - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } // Verify first 2 actors are allocated (FIFO) assert!( @@ -628,483 +993,317 @@ fn pending_allocation_queue_ordering() { }); } +// MARK: Runner Failures #[test] -#[ignore] -fn actor_allocation_prefers_available_runner() { - // TODO: Implement when we can test with multiple runners -} - -// MARK: 8. Key Reservation and Uniqueness -#[test] -fn key_reservation_single_datacenter() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; - - let key = common::generate_unique_key(); - - // Create first actor with key - let res1 = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create first actor"); - - let actor_id1 = res1.actor.actor_id.to_string(); - - tracing::info!(?actor_id1, ?key, "first actor created with key"); - - // Destroy first actor - common::api::public::actors_delete( - ctx.leader_dc().guard_port(), - common::api_types::actors::delete::DeletePath { - actor_id: actor_id1.parse().expect("failed to parse actor_id"), - }, - common::api_types::actors::delete::DeleteQuery { - namespace: Some(namespace.clone()), - }, - ) - .await - .expect("failed to delete first actor"); - - // Wait for destroy and key release - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - // Create second actor with same key (should succeed now) - let res2 = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create second actor after key release"); - - let actor_id2 = res2.actor.actor_id.to_string(); - - assert_ne!( - actor_id1, actor_id2, - "second actor should have different ID" - ); - - tracing::info!( - ?actor_id2, - ?key, - "second actor created with same key after first destroyed" - ); - }); -} - -#[test] -fn actor_lookup_by_key() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; - - let key = common::generate_unique_key(); - - // Create actor with key - let res = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create actor"); - - let actor_id = res.actor.actor_id.to_string(); - - // Query actor by key (name is required when using key) - let list_res = common::api::public::actors_list( - ctx.leader_dc().guard_port(), - common::api_types::actors::list::ListQuery { - actor_ids: None, - actor_id: vec![], - namespace: namespace.clone(), - name: Some("test-actor".to_string()), - key: Some(key.clone()), - include_destroyed: Some(false), - limit: None, - cursor: None, - }, - ) - .await - .expect("failed to list actors"); - - assert_eq!(list_res.actors.len(), 1, "should find exactly one actor"); - assert_eq!( - list_res.actors[0].actor_id.to_string(), - actor_id, - "should find the correct actor by key" - ); - - tracing::info!(?actor_id, ?key, "actor successfully looked up by key"); - }); -} - -// MARK: 9. Serverless Integration -#[test] -#[ignore] -fn serverless_slot_tracking() { - // TODO: Implement when serverless infrastructure is available -} +fn actor_survives_runner_disconnect() { + common::run( + common::TestOpts::new_with_timeout(1, 60), + |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + // Create runner and start actor + let (start_tx, start_rx) = tokio::sync::oneshot::channel(); + let start_tx = Arc::new(Mutex::new(Some(start_tx))); + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::NotifyOnStartActor::new( + start_tx.clone(), + )) + }) + }) + .await; + + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Restart, + ) + .await; -// MARK: 10. Actor Data and State -#[test] -#[ignore] -fn actor_kv_data_lifecycle() { - // TODO: Implement when KV data can be tested -} + let actor_id_str = res.actor.actor_id.to_string(); -// MARK: Edge Cases - 1. Runner Failures -#[test] -#[ignore] -fn actor_survives_runner_disconnect() { - // TODO: Implement when we can simulate runner disconnects + // Wait for actor to start + start_rx + .await + .expect("actor should have sent start notification"); + + tracing::info!(?actor_id_str, "actor started, simulating runner disconnect"); + + // Simulate runner disconnect by shutting down + runner.shutdown().await; + + tracing::info!( + "runner disconnected, waiting for system to detect and apply crash policy" + ); + + // Now we wait for runner_lost_threshold so that actor state updates + tokio::time::sleep(tokio::time::Duration::from_millis( + ctx.leader_dc() + .config + .pegboard() + .runner_lost_threshold() + .try_into() + .unwrap(), + )) + .await; + + // Poll for actor to be rescheduled (crash policy is Restart) + // The system should detect runner loss and apply the crash policy + let start = std::time::Instant::now(); + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + tracing::warn!(?actor); + // Actor should be waiting for an allocation after runner loss + if actor.pending_allocation_ts.is_some() { + break actor; + } + + if start.elapsed() > std::time::Duration::from_secs(50) { + // TODO: Always times out here + tracing::info!(?actor); + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + }; + + assert!( + actor.pending_allocation_ts.is_some(), + "actor should be pending allocation after runner disconnected and threshold hit with restart policy" + ); + assert!( + actor.connectable_ts.is_none(), + "actor should not be connectable after runner disconnect" + ); + }, + ); } +// MARK: Resource Limits #[test] #[ignore] -fn runner_reconnect_with_stale_actors() { - // TODO: Implement when we can simulate runner reconnection with stale state -} - -// MARK: Edge Cases - 2. Concurrent Operations -#[test] -fn concurrent_key_reservation() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; - - let key = common::generate_unique_key(); - let port = ctx.leader_dc().guard_port(); - let namespace_clone = namespace.clone(); - - // Launch two concurrent create requests with the same key - let handle1 = tokio::spawn({ - let key = key.clone(); - let namespace = namespace_clone.clone(); - async move { - common::api::public::actors_create( - port, - common::api_types::actors::create::CreateQuery { namespace }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - } - }); - - let handle2 = tokio::spawn({ - let key = key.clone(); - let namespace = namespace_clone.clone(); - async move { - common::api::public::actors_create( - port, - common::api_types::actors::create::CreateQuery { namespace }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, +fn runner_at_max_capacity() { + common::run( + common::TestOpts::new_with_timeout(1, 30), + |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + // Start runner with only 2 slots + + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder + .with_total_slots(2) + .with_actor_behavior("test-actor", move |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + }) + .await; + + // Create first two actors to fill capacity + let mut actor_ids = Vec::new(); + for _i in 0..2 { + let res = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, ) - .await - } - }); + .await; - let (res1, res2) = tokio::join!(handle1, handle2); + actor_ids.push(res.actor.actor_id.to_string()); + } - // Exactly one should succeed and one should fail - let success_count = [res1, res2] - .iter() - .filter(|r| r.as_ref().unwrap().is_ok()) - .count(); + // Poll for both actors to be allocated + loop { + let has_0 = runner.has_actor(&actor_ids[0]).await; + let has_1 = runner.has_actor(&actor_ids[1]).await; - assert_eq!( - success_count, 1, - "exactly one concurrent creation should succeed" - ); + if has_0 && has_1 { + break; + } - tracing::info!(?key, "concurrent key reservation handled correctly"); - }); -} - -#[test] -#[ignore] -fn concurrent_destroy_and_wake() { - // TODO: Implement when sleep/wake is available -} - -#[test] -fn concurrent_create_with_same_key_destroy() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } - let key = common::generate_unique_key(); + // Verify both actors are allocated + assert!(runner.has_actor(&actor_ids[0]).await); + assert!(runner.has_actor(&actor_ids[1]).await); - // Create first actor - let res1 = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create first actor"); - - let actor_id1 = res1.actor.actor_id.to_string(); - - // Start destroying - let delete_handle = tokio::spawn({ - let port = ctx.leader_dc().guard_port(); - let namespace = namespace.clone(); - let actor_id = actor_id1.clone(); - async move { - common::api::public::actors_delete( - port, - common::api_types::actors::delete::DeletePath { - actor_id: actor_id.parse().unwrap(), - }, - common::api_types::actors::delete::DeleteQuery { - namespace: Some(namespace), - }, - ) - .await - } - }); + // Create third actor (should be pending) + let res3 = create_actor( + ctx.leader_dc().guard_port(), + &namespace, + "test-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Destroy, + ) + .await; - // Small delay then try to create with same key - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let actor_id3 = res3.actor.actor_id.to_string(); - // Try to create second actor - should eventually succeed after destroy completes - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + // Verify third actor is pending + let actor3 = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id3, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); - let _res2 = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor".to_string(), - key: Some(key.clone()), - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("should succeed creating with same key after destroy"); + assert!( + actor3.pending_allocation_ts.is_some(), + "third actor should be pending when runner at capacity" + ); - delete_handle + // Destroy first actor to free a slot + common::api::public::actors_delete( + ctx.leader_dc().guard_port(), + common::api_types::actors::delete::DeletePath { + actor_id: actor_ids[0].parse().unwrap(), + }, + common::api_types::actors::delete::DeleteQuery { + namespace: Some(namespace.clone()), + }, + ) .await - .expect("delete should complete") - .expect("delete should succeed"); + .expect("failed to delete actor"); + + // Poll for third actor to be allocated (wait for slot to free and pending actor to be allocated) + loop { + tracing::warn!( + "polling runner: current actors: {:?}", + runner.get_actor_ids().await + ); + if runner.has_actor(&actor_id3).await { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } - tracing::info!("key reuse after destroy works correctly"); - }); + // Verify third actor is now allocated + assert!( + runner.has_actor(&actor_id3).await, + "pending actor should be allocated after slot freed" + ); + }, + ); } -// MARK: Edge Cases - 3. Resource Limits +// MARK: Timeout and Retry Scenarios #[test] -fn runner_at_max_capacity() { +fn exponential_backoff_max_retries() { common::run(common::TestOpts::new(1), |ctx| async move { let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; - // Start runner with only 2 slots - let runner = common::setup_runner( - ctx.leader_dc(), - &namespace, - &format!("key-{:012x}", rand::random::()), - 1, - 2, // Only 2 slots - None, - ) - .await; - - // Create first two actors to fill capacity - let mut actor_ids = Vec::new(); - for i in 0..2 { - let res = common::api::public::actors_create( - ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: format!("test-actor-{}", i), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, - ) - .await - .expect("failed to create actor"); - - actor_ids.push(res.actor.actor_id.to_string()); - } + // Create test runner with always-crashing actor - // Wait for allocation - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let runner = common::setup_runner(ctx.leader_dc(), &namespace, |builder| { + builder.with_actor_behavior("crash-always-actor", move |_| { + Box::new(common::test_runner::CrashOnStartActor::new(1)) + }) + }) + .await; - // Verify both actors are allocated - assert!(runner.has_actor(&actor_ids[0]).await); - assert!(runner.has_actor(&actor_ids[1]).await); + tracing::info!("test runner ready, creating actor that will always crash"); - // Create third actor (should be pending) - let res3 = common::api::public::actors_create( + // Create actor with restart crash policy + let res = create_actor( ctx.leader_dc().guard_port(), - common::api_types::actors::create::CreateQuery { - namespace: namespace.clone(), - }, - common::api_types::actors::create::CreateRequest { - datacenter: None, - name: "test-actor-3".to_string(), - key: None, - input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), - crash_policy: rivet_types::actors::CrashPolicy::Destroy, - }, + &namespace, + "crash-always-actor", + runner.name(), + rivet_types::actors::CrashPolicy::Restart, ) - .await - .expect("failed to create third actor"); - - let actor_id3 = res3.actor.actor_id.to_string(); - - // Wait a bit - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - // Verify third actor is pending - let actor3 = common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id3, &namespace) - .await - .expect("failed to get actor") - .expect("actor should exist"); + .await; - assert!( - actor3.pending_allocation_ts.is_some(), - "third actor should be pending when runner at capacity" - ); + let actor_id_str = res.actor.actor_id.to_string(); + + tracing::info!(?actor_id_str, "actor created, will crash repeatedly"); + + // Track reschedule timestamps to verify backoff increases + let mut previous_reschedule_ts: Option = None; + let mut backoff_deltas = Vec::new(); + + // Poll for multiple crashes and verify backoff increases + for iteration in 0..5 { + let actor = loop { + let actor = + common::try_get_actor(ctx.leader_dc().guard_port(), &actor_id_str, &namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.reschedule_ts.is_some() { + break actor; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }; + + let current_reschedule_ts = actor.reschedule_ts.expect("reschedule_ts should be set"); + + tracing::info!( + iteration, + reschedule_ts = current_reschedule_ts, + "actor has reschedule_ts after crash" + ); + + // Calculate backoff delta if we have a previous timestamp + if let Some(prev_ts) = previous_reschedule_ts { + let delta = current_reschedule_ts - prev_ts; + backoff_deltas.push(delta); + tracing::info!( + iteration, + delta_ms = delta, + "backoff delta from previous reschedule" + ); + } - // Destroy first actor to free a slot - common::api::public::actors_delete( - ctx.leader_dc().guard_port(), - common::api_types::actors::delete::DeletePath { - actor_id: actor_ids[0].parse().unwrap(), - }, - common::api_types::actors::delete::DeleteQuery { - namespace: Some(namespace.clone()), - }, - ) - .await - .expect("failed to delete actor"); + previous_reschedule_ts = Some(current_reschedule_ts); + + // Wait for the reschedule time to pass so next crash can happen + let now = rivet_util::timestamp::now(); + if current_reschedule_ts > now { + let wait_duration = (current_reschedule_ts - now) as u64; + tracing::info!( + wait_duration_ms = wait_duration, + "waiting for reschedule time" + ); + tokio::time::sleep(tokio::time::Duration::from_millis(wait_duration + 100)).await; + } + } - // Wait for slot to free and pending actor to be allocated - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + // Verify that backoff intervals generally increase (exponential backoff) + // We expect each delta to be larger than or equal to the previous + // (allowing some tolerance for system timing) + for i in 1..backoff_deltas.len() { + tracing::info!( + iteration = i, + current_delta = backoff_deltas[i], + previous_delta = backoff_deltas[i - 1], + "comparing backoff deltas" + ); + + // Allow some tolerance: current should be >= 80% of expected growth + // (exponential backoff typically doubles, but we allow for some variance) + assert!( + backoff_deltas[i] >= backoff_deltas[i - 1] / 2, + "backoff should not decrease significantly: iteration {}, prev={}, curr={}", + i, + backoff_deltas[i - 1], + backoff_deltas[i] + ); + } - // Verify third actor is now allocated - assert!( - runner.has_actor(&actor_id3).await, - "pending actor should be allocated after slot freed" + tracing::info!( + ?backoff_deltas, + "exponential backoff verified across multiple crashes" ); - - tracing::info!("runner capacity and pending allocation verified"); }); } - -// MARK: Edge Cases - 4. Timeout and Retry Scenarios -#[test] -#[ignore] -fn exponential_backoff_max_retries() { - // TODO: Implement when crash simulation is available -} - -#[test] -#[ignore] -fn gc_timeout_start_threshold() { - // TODO: Implement when we can control actor start timing -} - -#[test] -#[ignore] -fn gc_timeout_stop_threshold() { - // TODO: Implement when we can control actor stop timing -} - -// MARK: Edge Cases - 5. Data Consistency -#[test] -#[ignore] -fn actor_state_persistence_across_reschedule() { - // TODO: Implement when crash/reschedule is testable -} - -#[test] -#[ignore] -fn index_consistency_after_failure() { - // TODO: Implement when we have failure injection capabilities -} - -// MARK: Edge Cases - 6. Protocol Edge Cases -#[test] -#[ignore] -fn duplicate_actor_state_running_events() { - // TODO: Implement when we can send duplicate protocol events -} - -#[test] -#[ignore] -fn actor_state_stopped_before_running() { - // TODO: Implement when we can control protocol event ordering -} - -#[test] -#[ignore] -fn runner_ack_command_failures() { - // TODO: Implement when we can simulate ack failures -} diff --git a/engine/packages/engine/tests/api_actors_create.rs b/engine/packages/engine/tests/api_actors_create.rs index f734b22280..6ce1c50632 100644 --- a/engine/packages/engine/tests/api_actors_create.rs +++ b/engine/packages/engine/tests/api_actors_create.rs @@ -17,7 +17,7 @@ fn create_actor_valid_namespace() { name: "test-actor".to_string(), key: None, input: None, - runner_name_selector: common::TEST_RUNNER_NAME.to_string(), + runner_name_selector: runner.name().to_string(), crash_policy: rivet_types::actors::CrashPolicy::Destroy, }, ) @@ -27,6 +27,9 @@ fn create_actor_valid_namespace() { common::assert_actor_exists(ctx.leader_dc().guard_port(), &actor_id, &namespace).await; + // TODO: Hook into engine instead of sleep + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + assert!( runner.has_actor(&actor_id).await, "runner should have the actor" diff --git a/engine/packages/engine/tests/api_actors_delete.rs b/engine/packages/engine/tests/api_actors_delete.rs index 72329721b8..0a115fc955 100644 --- a/engine/packages/engine/tests/api_actors_delete.rs +++ b/engine/packages/engine/tests/api_actors_delete.rs @@ -116,14 +116,12 @@ fn delete_actor_current_datacenter() { ) .await .expect("failed to create actor"); - let actor_id = res.actor.actor_id.to_string(); + let actor_id = res.actor.actor_id; // Delete the actor common::api::public::actors_delete( ctx.leader_dc().guard_port(), - common::api_types::actors::delete::DeletePath { - actor_id: actor_id.parse().expect("failed to parse actor_id"), - }, + common::api_types::actors::delete::DeletePath { actor_id: actor_id }, common::api_types::actors::delete::DeleteQuery { namespace: Some(namespace.clone()), }, @@ -132,8 +130,12 @@ fn delete_actor_current_datacenter() { .expect("failed to delete actor"); // Verify actor is destroyed - common::assert_actor_is_destroyed(ctx.leader_dc().guard_port(), &actor_id, &namespace) - .await; + common::assert_actor_is_destroyed( + ctx.leader_dc().guard_port(), + &actor_id.to_string(), + &namespace, + ) + .await; }); } diff --git a/engine/packages/engine/tests/api_actors_get_or_create.rs b/engine/packages/engine/tests/api_actors_get_or_create.rs index 966b61f378..fab7befbed 100644 --- a/engine/packages/engine/tests/api_actors_get_or_create.rs +++ b/engine/packages/engine/tests/api_actors_get_or_create.rs @@ -360,15 +360,20 @@ fn get_or_create_race_condition_across_datacenters() { let (namespace, _, _runner) = common::setup_test_namespace_with_runner(ctx.leader_dc()).await; - let _runner2 = common::setup_runner( - ctx.get_dc(2), - &namespace, - &format!("key-{:012x}", rand::random::()), - 1, - 20, - Some(DC2_RUNNER_NAME.to_string()), - ) - .await; + let mut _runner2 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_key(&format!("key-{:012x}", rand::random::())) + .with_version(1) + .with_total_slots(20) + .with_runner_name(DC2_RUNNER_NAME) + .with_actor_behavior("test-actor", |_config| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.get_dc(2)) + .await + .expect("failed to build test runner"); + + _runner2.start().await.expect("failed to start runner"); + _runner2.wait_ready().await; let actor_name = "cross-dc-race-actor"; let actor_key = "cross-dc-race-key"; diff --git a/engine/packages/engine/tests/api_actors_list.rs b/engine/packages/engine/tests/api_actors_list.rs index b3bdca0a7b..566c453195 100644 --- a/engine/packages/engine/tests/api_actors_list.rs +++ b/engine/packages/engine/tests/api_actors_list.rs @@ -636,33 +636,6 @@ fn list_with_non_existent_namespace() { }); } -#[test] -fn list_with_both_actor_ids_and_name() { - common::run(common::TestOpts::new(1), |ctx| async move { - let (namespace, _, _runner) = - common::setup_test_namespace_with_runner(ctx.leader_dc()).await; - - // Try to list with both actor_ids and name (validation error) - let res = common::api::public::actors_list( - ctx.leader_dc().guard_port(), - common::api_types::actors::list::ListQuery { - namespace: namespace.clone(), - name: Some("test-actor".to_string()), - key: None, - actor_id: common::convert_strings_to_ids(vec!["some-id".to_string()]), - actor_ids: None, - include_destroyed: None, - limit: None, - cursor: None, - }, - ) - .await; - - // Should fail with validation error - assert!(res.is_err(), "Should return error for invalid parameters"); - }); -} - #[test] fn list_with_key_but_no_name() { common::run(common::TestOpts::new(1), |ctx| async move { @@ -1110,8 +1083,8 @@ fn list_with_invalid_actor_id_format_in_comma_list() { namespace: namespace.clone(), name: None, key: None, - actor_id: common::convert_strings_to_ids(mixed_ids), - actor_ids: None, + actor_id: None, + actor_ids: Some(mixed_ids.join(",")), include_destroyed: None, limit: None, cursor: None, diff --git a/engine/packages/engine/tests/api_namespaces_list.rs b/engine/packages/engine/tests/api_namespaces_list.rs index 1623abfa01..c4ad94b7f9 100644 --- a/engine/packages/engine/tests/api_namespaces_list.rs +++ b/engine/packages/engine/tests/api_namespaces_list.rs @@ -428,8 +428,8 @@ fn list_namespaces_filter_by_ids_empty_list() { ctx.leader_dc().guard_port(), rivet_api_types::namespaces::list::ListQuery { name: None, - namespace_id: vec![], - namespace_ids: Some("invalid,not-a-uuid,bad-id".to_string()), + namespace_id: vec![generate_dummy_rivet_id(), generate_dummy_rivet_id()], + namespace_ids: None, limit: None, cursor: None, }, @@ -437,6 +437,8 @@ fn list_namespaces_filter_by_ids_empty_list() { .await .expect("failed to list namespaces"); + tracing::info!(?response.namespaces, "received response"); + assert_eq!(response.namespaces.len(), 0, "Should return empty array"); }); } diff --git a/engine/packages/engine/tests/api_runner_configs_upsert.rs b/engine/packages/engine/tests/api_runner_configs_upsert.rs index ceb25fc2a9..074c0d7fe9 100644 --- a/engine/packages/engine/tests/api_runner_configs_upsert.rs +++ b/engine/packages/engine/tests/api_runner_configs_upsert.rs @@ -580,3 +580,49 @@ fn upsert_runner_config_idempotent() { assert!(response2.endpoint_config_changed || !response2.endpoint_config_changed); }); } + +// MARK: Runner validation tests + +#[test] +fn upsert_runner_config_serverless_slots_per_runner_zero() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, _) = common::setup_test_namespace(ctx.leader_dc()).await; + + let runner_name = "zero-slots-runner"; + let mut datacenters = HashMap::new(); + datacenters.insert( + "dc-1".to_string(), + rivet_api_types::namespaces::runner_configs::RunnerConfig { + kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless { + url: "http://example.com".to_string(), + headers: None, + request_lifespan: 30, + slots_per_runner: 0, // Invalid: should be rejected + min_runners: Some(1), + max_runners: 5, + runners_margin: Some(2), + }, + metadata: None, + }, + ); + + // Attempt to upsert runner config with slots_per_runner = 0 + let result = common::api::public::runner_configs_upsert( + ctx.leader_dc().guard_port(), + rivet_api_peer::runner_configs::UpsertPath { + runner_name: runner_name.to_string(), + }, + rivet_api_peer::runner_configs::UpsertQuery { + namespace: namespace.clone(), + }, + rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters }, + ) + .await; + + // Should fail because slots_per_runner cannot be 0 + assert!( + result.is_err(), + "Upsert should fail when slots_per_runner is 0" + ); + }); +} diff --git a/engine/packages/engine/tests/common/ctx.rs b/engine/packages/engine/tests/common/ctx.rs index b2dde3c996..4669a1221e 100644 --- a/engine/packages/engine/tests/common/ctx.rs +++ b/engine/packages/engine/tests/common/ctx.rs @@ -7,11 +7,22 @@ use super::api; pub struct TestOpts { pub datacenter_count: usize, + pub timeout_secs: u64, } impl TestOpts { pub fn new(datacenter_count: usize) -> Self { - Self { datacenter_count } + Self { + datacenter_count, + ..Default::default() + } + } + + pub fn new_with_timeout(datacenter_count: usize, timeout_secs: u64) -> Self { + Self { + datacenter_count, + timeout_secs, + } } } @@ -19,6 +30,7 @@ impl Default for TestOpts { fn default() -> Self { Self { datacenter_count: 1, + timeout_secs: 10, } } } @@ -37,11 +49,6 @@ pub struct TestDatacenter { } impl TestCtx { - /// Creates a test context with multiple datacenters - pub async fn new_multi(dc_count: usize) -> Result { - Self::new_with_opts(TestOpts::new(dc_count)).await - } - /// Creates a test context with custom options pub async fn new_with_opts(opts: TestOpts) -> Result { // Set up logging diff --git a/engine/packages/engine/tests/common/mod.rs b/engine/packages/engine/tests/common/mod.rs index 3db9d5aefe..8c9b100f13 100644 --- a/engine/packages/engine/tests/common/mod.rs +++ b/engine/packages/engine/tests/common/mod.rs @@ -3,14 +3,15 @@ pub mod actors; pub mod api; pub mod ctx; -pub mod runner; pub mod test_helpers; +pub mod test_runner; pub use actors::*; pub use ctx::*; pub use rivet_api_types as api_types; -pub use runner::TEST_RUNNER_NAME; +pub const TEST_RUNNER_NAME: &'static str = "test-runner"; pub use test_helpers::*; +pub use test_runner::*; use std::future::Future; use std::time::Duration; @@ -21,10 +22,10 @@ where Fut: Future, { let runtime = tokio::runtime::Runtime::new().expect("failed to build runtime"); - runtime.block_on(async { + let timeout = Duration::from_secs(opts.timeout_secs); let ctx = TestCtx::new_with_opts(opts).await.expect("build testctx"); - tokio::time::timeout(Duration::from_secs(10), test_fn(ctx)) + tokio::time::timeout(timeout, test_fn(ctx)) .await .expect("test timed out"); }); diff --git a/engine/packages/engine/tests/common/runner.rs b/engine/packages/engine/tests/common/runner.rs deleted file mode 100644 index fa69a3a072..0000000000 --- a/engine/packages/engine/tests/common/runner.rs +++ /dev/null @@ -1,146 +0,0 @@ -use std::{path::Path, time::Duration}; - -use rivet_util::Id; -use tokio::process::{Child, Command}; - -pub const TEST_RUNNER_NAME: &'static str = "test-runner"; - -pub struct TestRunner { - pub runner_id: Id, - internal_port: u16, - pub port: u16, - handle: Child, -} - -impl TestRunner { - pub async fn new( - port: u16, - namespace_name: &str, - key: &str, - version: u32, - total_slots: u32, - runner_name: Option, - ) -> Self { - let internal_server_port = portpicker::pick_unused_port().expect("runner http server port"); - let http_server_port = portpicker::pick_unused_port().expect("runner http server port"); - - tracing::info!(?internal_server_port, ?http_server_port, "starting runner"); - - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - let runner_script_path = - Path::new(manifest_dir).join("../../sdks/typescript/test-runner/dist/index.js"); - - if !runner_script_path.exists() { - panic!( - "Runner script not found at '{}'. Build it first with `pnpm install && pnpm build -F @rivetkit/engine-test-runner`.", - runner_script_path.display(), - ); - } - - tracing::info!(?runner_script_path, "spawning runner process"); - - let handle = Command::new("node") - .arg(runner_script_path) - .env("INTERNAL_SERVER_PORT", internal_server_port.to_string()) - .env("RIVET_NAMESPACE", namespace_name) - .env("RIVET_RUNNER_KEY", key.to_string()) - .env("RIVET_RUNNER_VERSION", version.to_string()) - .env("RIVET_RUNNER_TOTAL_SLOTS", total_slots.to_string()) - .env( - "RIVET_RUNNER_NAME", - runner_name.unwrap_or(TEST_RUNNER_NAME.to_string()), - ) - .env("RIVET_ENDPOINT", format!("http://127.0.0.1:{port}")) - // Uncomment for runner logs - // .env("LOG_LEVEL", "DEBUG") - // .stdout(std::process::Stdio::inherit()) - .kill_on_drop(true) - .spawn() - .expect("Failed to execute runner js file, node not installed"); - - let runner_id = Self::wait_ready(internal_server_port).await; - tokio::time::sleep(Duration::from_millis(500)).await; - TestRunner { - runner_id, - internal_port: internal_server_port, - port: http_server_port, - handle, - } - } - - async fn wait_ready(port: u16) -> Id { - let client = reqwest::Client::new(); - let mut attempts = 0; - - loop { - let res = client - .get(format!("http://127.0.0.1:{port}/wait-ready")) - .send() - .await; - - let response = match res { - Ok(x) => x, - Err(err) => { - if attempts < 10 { - attempts += 1; - tokio::time::sleep(Duration::from_millis(150)).await; - continue; - } else { - Err(err).expect("Failed to send wait ready request to runner") - } - } - }; - - if !response.status().is_success() { - if attempts < 10 { - attempts += 1; - tokio::time::sleep(Duration::from_millis(150)).await; - continue; - } - - let text = response.text().await.expect("Failed to read response text"); - panic!("Failed to wait ready for runner: {text}"); - } - - return response - .json() - .await - .expect("Failed to parse JSON response"); - } - } - - pub async fn has_actor(&self, actor_id: &str) -> bool { - let client = reqwest::Client::new(); - let response = client - .get(format!("http://127.0.0.1:{}/has-actor", self.internal_port)) - .query(&[("actor", actor_id)]) - .send() - .await - .expect("Failed to send request has-actor to runner"); - - if response.status() == reqwest::StatusCode::NOT_FOUND { - return false; - } - - if response.status().is_success() { - return true; - } - - let text = response.text().await.expect("Failed to fetch has-actor"); - panic!("Failed to fetch has-actor: {text}"); - } - - pub async fn shutdown(&self) { - let client = reqwest::Client::new(); - let response = client - .get(format!("http://127.0.0.1:{}/shutdown", self.internal_port)) - .send() - .await - .expect("Failed to send shutdown request to runner"); - - if !response.status().is_success() { - let text = response.text().await.expect("Failed to read response text"); - panic!("Failed to shutdown runner: {text}"); - } - } -} diff --git a/engine/packages/engine/tests/common/test_helpers.rs b/engine/packages/engine/tests/common/test_helpers.rs index 2de2c7e6db..9303d28469 100644 --- a/engine/packages/engine/tests/common/test_helpers.rs +++ b/engine/packages/engine/tests/common/test_helpers.rs @@ -23,41 +23,19 @@ pub async fn setup_test_namespace(leader_dc: &TestDatacenter) -> (String, rivet_ // Setup namespace with runner pub async fn setup_test_namespace_with_runner( dc: &super::TestDatacenter, -) -> (String, rivet_util::Id, super::runner::TestRunner) { +) -> (String, rivet_util::Id, super::test_runner::TestRunner) { let (namespace_name, namespace_id) = setup_test_namespace(dc).await; - let runner = setup_runner( - dc, - &namespace_name, - &format!("key-{:012x}", rand::random::()), - 1, - 20, - None, - ) + let runner = setup_runner(dc, &namespace_name, |builder| { + builder.with_actor_behavior("test-actor", |_config| { + Box::new(super::test_runner::EchoActor::new()) + }) + }) .await; (namespace_name, namespace_id, runner) } -pub async fn setup_runner( - dc: &super::TestDatacenter, - namespace_name: &str, - key: &str, - version: u32, - total_slots: u32, - runner_name: Option, -) -> super::runner::TestRunner { - super::runner::TestRunner::new( - dc.guard_port(), - &namespace_name, - key, - version, - total_slots, - runner_name, - ) - .await -} - pub async fn cleanup_test_namespace(namespace_id: rivet_util::Id, _guard_port: u16) { // TODO: implement namespace deletion when available tracing::info!(?namespace_id, "namespace cleanup (not implemented)"); @@ -183,10 +161,62 @@ pub fn get_test_datacenter_name(label: u16) -> String { format!("dc-{}", label) } -pub async fn setup_multi_datacenter_test() -> super::TestCtx { - super::TestCtx::new_multi(2) +// Timing helpers for eventual consistency +pub async fn wait_for_eventual_consistency() { + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; +} + +pub async fn wait_for_actor_propagation(actor_id: &str, _generation: u32) { + // Wait for actor state to propagate through all systems + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; +} + +// Test runner helper functions + +/// Build a test runner with specified configuration +/// +/// Defaults to 20 total slots, but can be overridden in the builder closure. +/// +/// Example: +/// ``` +/// // Default 20 slots +/// let runner = setup_runner(ctx.leader_dc(), &namespace, |builder| { +/// builder +/// .with_actor_behavior("test-actor", |_| Box::new(EchoActor::new())) +/// .with_actor_behavior("crash-actor", |_| Box::new(CrashOnStartActor::new(1))) +/// }).await; +/// +/// // Override slots +/// let runner = setup_runner(ctx.leader_dc(), &namespace, |builder| { +/// builder +/// .with_total_slots(2) +/// .with_actor_behavior("test-actor", |_| Box::new(EchoActor::new())) +/// }).await; +/// ``` +pub async fn setup_runner( + dc: &super::TestDatacenter, + namespace: &str, + configure: F, +) -> super::test_runner::TestRunner +where + F: FnOnce(super::test_runner::TestRunnerBuilder) -> super::test_runner::TestRunnerBuilder, +{ + let builder = super::test_runner::TestRunnerBuilder::new(namespace) + .with_runner_key(&format!("key-{:012x}", rand::random::())) + .with_version(1) + .with_total_slots(20); + + let builder = configure(builder); + + let runner = builder + .build(dc) .await - .expect("Failed to setup multi-datacenter test") + .expect("failed to build test runner"); + + runner.start().await.expect("failed to start runner"); + runner.wait_ready().await; + + runner } pub fn convert_strings_to_ids(actor_ids: Vec) -> Vec { @@ -195,3 +225,7 @@ pub fn convert_strings_to_ids(actor_ids: Vec) -> Vec { .map(|x| rivet_util::Id::from_str(&x).expect("failed to convert actor ids to string")) .collect::>() } + +pub fn generate_dummy_rivet_id(dc: &super::TestDatacenter) -> rivet_util::Id { + rivet_util::Id::new_v1(dc.config.dc_label()) +} diff --git a/engine/packages/engine/tests/common/test_runner/actor.rs b/engine/packages/engine/tests/common/test_runner/actor.rs new file mode 100644 index 0000000000..c0141fa7b4 --- /dev/null +++ b/engine/packages/engine/tests/common/test_runner/actor.rs @@ -0,0 +1,282 @@ +use anyhow::*; +use async_trait::async_trait; +use rivet_runner_protocol as rp; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; + +use super::protocol; + +/// Configuration passed to actor when it starts +#[derive(Clone)] +pub struct ActorConfig { + pub actor_id: String, + pub generation: u32, + pub name: String, + pub key: Option, + pub create_ts: i64, + pub input: Option>, + + /// Channel to send events to the runner + pub event_tx: mpsc::UnboundedSender, + + /// Channel to send KV requests to the runner + pub kv_request_tx: mpsc::UnboundedSender, +} + +impl From<&rp::ActorConfig> for ActorConfig { + fn from(config: &rp::ActorConfig) -> Self { + // Create dummy channels (will be replaced by runner) + let (event_tx, _) = mpsc::unbounded_channel(); + let (kv_request_tx, _) = mpsc::unbounded_channel(); + + ActorConfig { + actor_id: String::new(), // Will be set by runner + generation: 0, // Will be set by runner + name: config.name.clone(), + key: config.key.clone(), + create_ts: config.create_ts, + input: config.input.as_ref().map(|i| i.to_vec()), + event_tx, + kv_request_tx, + } + } +} + +impl ActorConfig { + /// Send a sleep intent + pub fn send_sleep_intent(&self) { + let event = protocol::make_actor_intent( + &self.actor_id, + self.generation, + rp::ActorIntent::ActorIntentSleep, + ); + self.send_event(event); + } + + /// Send a stop intent + pub fn send_stop_intent(&self) { + let event = protocol::make_actor_intent( + &self.actor_id, + self.generation, + rp::ActorIntent::ActorIntentStop, + ); + self.send_event(event); + } + + /// Set an alarm to wake at specified timestamp (milliseconds) + pub fn send_set_alarm(&self, alarm_ts: i64) { + let event = protocol::make_set_alarm(&self.actor_id, self.generation, Some(alarm_ts)); + self.send_event(event); + } + + /// Clear the alarm + pub fn send_clear_alarm(&self) { + let event = protocol::make_set_alarm(&self.actor_id, self.generation, None); + self.send_event(event); + } + + /// Send a custom event + fn send_event(&self, event: rp::Event) { + let actor_event = ActorEvent { + actor_id: self.actor_id.clone(), + generation: self.generation, + event, + }; + let _ = self.event_tx.send(actor_event); + } + + /// Send a KV get request + pub async fn send_kv_get(&self, keys: Vec>) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + let request = KvRequest { + actor_id: self.actor_id.clone(), + data: rp::KvRequestData::KvGetRequest(rp::KvGetRequest { keys }), + response_tx, + }; + self.kv_request_tx + .send(request) + .map_err(|_| anyhow!("failed to send KV get request"))?; + let response: rp::KvResponseData = response_rx + .await + .map_err(|_| anyhow!("KV get request response channel closed"))?; + + match response { + rp::KvResponseData::KvGetResponse(data) => Ok(data), + rp::KvResponseData::KvErrorResponse(err) => { + Err(anyhow!("KV get failed: {}", err.message)) + } + _ => Err(anyhow!("unexpected response type for KV get")), + } + } + + /// Send a KV list request + pub async fn send_kv_list( + &self, + query: rp::KvListQuery, + reverse: Option, + limit: Option, + ) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + let request = KvRequest { + actor_id: self.actor_id.clone(), + data: rp::KvRequestData::KvListRequest(rp::KvListRequest { + query, + reverse, + limit, + }), + response_tx, + }; + self.kv_request_tx + .send(request) + .map_err(|_| anyhow!("failed to send KV list request"))?; + let response: rp::KvResponseData = response_rx + .await + .map_err(|_| anyhow!("KV list request response channel closed"))?; + + match response { + rp::KvResponseData::KvListResponse(data) => Ok(data), + rp::KvResponseData::KvErrorResponse(err) => { + Err(anyhow!("KV list failed: {}", err.message)) + } + _ => Err(anyhow!("unexpected response type for KV list")), + } + } + + /// Send a KV put request + pub async fn send_kv_put(&self, keys: Vec>, values: Vec>) -> Result<()> { + let (response_tx, response_rx) = oneshot::channel(); + let request = KvRequest { + actor_id: self.actor_id.clone(), + data: rp::KvRequestData::KvPutRequest(rp::KvPutRequest { keys, values }), + response_tx, + }; + self.kv_request_tx + .send(request) + .map_err(|_| anyhow!("failed to send KV put request"))?; + let response: rp::KvResponseData = response_rx + .await + .map_err(|_| anyhow!("KV put request response channel closed"))?; + + match response { + rp::KvResponseData::KvPutResponse => Ok(()), + rp::KvResponseData::KvErrorResponse(err) => { + Err(anyhow!("KV put failed: {}", err.message)) + } + _ => Err(anyhow!("unexpected response type for KV put")), + } + } + + /// Send a KV delete request + pub async fn send_kv_delete(&self, keys: Vec>) -> Result<()> { + let (response_tx, response_rx) = oneshot::channel(); + let request = KvRequest { + actor_id: self.actor_id.clone(), + data: rp::KvRequestData::KvDeleteRequest(rp::KvDeleteRequest { keys }), + response_tx, + }; + self.kv_request_tx + .send(request) + .map_err(|_| anyhow!("failed to send KV delete request"))?; + let response: rp::KvResponseData = response_rx + .await + .map_err(|_| anyhow!("KV delete request response channel closed"))?; + + match response { + rp::KvResponseData::KvDeleteResponse => Ok(()), + rp::KvResponseData::KvErrorResponse(err) => { + Err(anyhow!("KV delete failed: {}", err.message)) + } + _ => Err(anyhow!("unexpected response type for KV delete")), + } + } + + /// Send a KV drop request + pub async fn send_kv_drop(&self) -> Result<()> { + let (response_tx, response_rx) = oneshot::channel(); + let request = KvRequest { + actor_id: self.actor_id.clone(), + data: rp::KvRequestData::KvDropRequest, + response_tx, + }; + self.kv_request_tx + .send(request) + .map_err(|_| anyhow!("failed to send KV drop request"))?; + let response: rp::KvResponseData = response_rx + .await + .map_err(|_| anyhow!("KV drop request response channel closed"))?; + + match response { + rp::KvResponseData::KvDropResponse => Ok(()), + rp::KvResponseData::KvErrorResponse(err) => { + Err(anyhow!("KV drop failed: {}", err.message)) + } + _ => Err(anyhow!("unexpected response type for KV drop")), + } + } +} + +/// Result of actor start operation +#[derive(Debug, Clone)] +pub enum ActorStartResult { + /// Send ActorStateRunning immediately + Running, + /// Wait specified duration before sending running + Delay(Duration), + /// Never send running (simulates timeout) + Timeout, + /// Crash immediately with exit code + Crash { code: i32, message: String }, +} + +/// Result of actor stop operation +#[derive(Debug, Clone)] +pub enum ActorStopResult { + /// Stop successfully (exit code 0) + Success, + /// Wait before stopping + Delay(Duration), + /// Crash with exit code + Crash { code: i32, message: String }, +} + +/// Trait for test actors that can be controlled programmatically +#[async_trait] +pub trait TestActor: Send + Sync { + /// Called when actor receives start command + async fn on_start(&mut self, config: ActorConfig) -> Result; + + /// Called when actor receives stop command + async fn on_stop(&mut self) -> Result; + + /// Called when actor receives alarm wake signal + async fn on_alarm(&mut self) -> Result<()> { + tracing::debug!("actor received alarm (default no-op)"); + Ok(()) + } + + /// Called when actor receives wake signal (from sleep) + async fn on_wake(&mut self) -> Result<()> { + tracing::debug!("actor received wake (default no-op)"); + Ok(()) + } + + /// Get actor's name for logging + fn name(&self) -> &str { + "TestActor" + } +} + +/// Events that actors can send directly via the event channel +#[derive(Debug, Clone)] +pub struct ActorEvent { + pub actor_id: String, + pub generation: u32, + pub event: rp::Event, +} + +/// KV requests that actors can send to the runner +pub struct KvRequest { + pub actor_id: String, + pub data: rp::KvRequestData, + pub response_tx: oneshot::Sender, +} diff --git a/engine/packages/engine/tests/common/test_runner/behaviors.rs b/engine/packages/engine/tests/common/test_runner/behaviors.rs new file mode 100644 index 0000000000..bd92e5f127 --- /dev/null +++ b/engine/packages/engine/tests/common/test_runner/behaviors.rs @@ -0,0 +1,401 @@ +use super::actor::*; +use anyhow::*; +use async_trait::async_trait; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +/// Simple echo actor that responds successfully and does nothing special +pub struct EchoActor; + +impl EchoActor { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TestActor for EchoActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!(actor_id = ?config.actor_id, generation = config.generation, "echo actor started"); + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + tracing::info!("echo actor stopped"); + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "EchoActor" + } +} + +/// Actor that crashes immediately on start with specified exit code +pub struct CrashOnStartActor { + pub exit_code: i32, + pub message: String, + notify_tx: Option>>>>, +} + +impl CrashOnStartActor { + pub fn new(exit_code: i32) -> Self { + Self { + exit_code, + message: format!("crash on start with code {}", exit_code), + notify_tx: None, + } + } + + pub fn new_with_notify( + exit_code: i32, + notify_tx: std::sync::Arc>>>, + ) -> Self { + Self { + exit_code, + message: format!("crash on start with code {}", exit_code), + notify_tx: Some(notify_tx), + } + } +} + +#[async_trait] +impl TestActor for CrashOnStartActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::warn!( + actor_id = ?config.actor_id, + generation = config.generation, + exit_code = self.exit_code, + "crash on start actor crashing" + ); + + // Notify before crashing + if let Some(notify_tx) = &self.notify_tx { + let mut guard = notify_tx.lock().expect("failed to lock notify_tx"); + if let Some(tx) = guard.take() { + let _ = tx.send(()); + } + } + + Ok(ActorStartResult::Crash { + code: self.exit_code, + message: self.message.clone(), + }) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "CrashOnStartActor" + } +} + +/// Actor that delays before sending running state +pub struct DelayedStartActor { + pub delay: Duration, +} + +impl DelayedStartActor { + pub fn new(delay: Duration) -> Self { + Self { delay } + } +} + +#[async_trait] +impl TestActor for DelayedStartActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!( + actor_id = ?config.actor_id, + generation = config.generation, + delay_ms = self.delay.as_millis(), + "delayed start actor will delay before running" + ); + Ok(ActorStartResult::Delay(self.delay)) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "DelayedStartActor" + } +} + +/// Actor that never sends running state (simulates timeout) +pub struct TimeoutActor; + +impl TimeoutActor { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TestActor for TimeoutActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::warn!( + actor_id = ?config.actor_id, + generation = config.generation, + "timeout actor will never send running state" + ); + Ok(ActorStartResult::Timeout) + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "TimeoutActor" + } +} + +/// Actor that sends sleep intent immediately after starting +pub struct SleepImmediatelyActor { + notify_tx: Option>>>>, +} + +impl SleepImmediatelyActor { + pub fn new() -> Self { + Self { notify_tx: None } + } + + pub fn new_with_notify( + notify_tx: std::sync::Arc>>>, + ) -> Self { + Self { + notify_tx: Some(notify_tx), + } + } +} + +#[async_trait] +impl TestActor for SleepImmediatelyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!( + actor_id = ?config.actor_id, + generation = config.generation, + "sleep immediately actor started, sending sleep intent" + ); + + // Send sleep intent immediately + config.send_sleep_intent(); + + // Notify that we're sending sleep intent + if let Some(notify_tx) = &self.notify_tx { + let mut guard = notify_tx.lock().expect("failed to lock notify_tx"); + if let Some(tx) = guard.take() { + let _ = tx.send(()); + } + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + tracing::info!("sleep immediately actor stopped"); + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "SleepImmediatelyActor" + } +} + +/// Actor that sends stop intent immediately after starting +pub struct StopImmediatelyActor; + +impl StopImmediatelyActor { + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl TestActor for StopImmediatelyActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!( + actor_id = ?config.actor_id, + generation = config.generation, + "stop immediately actor started, sending stop intent" + ); + + // Send stop intent immediately + config.send_stop_intent(); + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + tracing::info!("stop immediately actor stopped gracefully"); + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "StopImmediatelyActor" + } +} + +/// Actor that crashes N times then succeeds +/// Used to test crash policy restart with retry reset on success +pub struct CrashNTimesThenSucceedActor { + crash_count: Arc>, + max_crashes: usize, +} + +impl CrashNTimesThenSucceedActor { + pub fn new(max_crashes: usize, crash_count: Arc>) -> Self { + Self { + crash_count, + max_crashes, + } + } +} + +#[async_trait] +impl TestActor for CrashNTimesThenSucceedActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + let mut count = self.crash_count.lock().unwrap(); + let current = *count; + + if current < self.max_crashes { + *count += 1; + tracing::warn!( + actor_id = ?config.actor_id, + generation = config.generation, + crash_count = current + 1, + max_crashes = self.max_crashes, + "crashing (will succeed after more crashes)" + ); + Ok(ActorStartResult::Crash { + code: 1, + message: format!("crash {} of {}", current + 1, self.max_crashes), + }) + } else { + tracing::info!( + actor_id = ?config.actor_id, + generation = config.generation, + crash_count = current, + "succeeded after crashes" + ); + Ok(ActorStartResult::Running) + } + } + + async fn on_stop(&mut self) -> Result { + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "CrashNTimesThenSucceedActor" + } +} + +/// Actor that notifies via a oneshot channel when it starts running +/// This allows tests to wait for the actor to actually start instead of sleeping +pub struct NotifyOnStartActor { + notify_tx: std::sync::Arc>>>, +} + +impl NotifyOnStartActor { + pub fn new( + notify_tx: std::sync::Arc>>>, + ) -> Self { + Self { notify_tx } + } +} + +#[async_trait] +impl TestActor for NotifyOnStartActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!( + actor_id = ?config.actor_id, + generation = config.generation, + "notify on start actor started, sending notification" + ); + + // Send notification that actor has started + let mut guard = self.notify_tx.lock().expect("failed to lock notify_tx"); + if let Some(tx) = guard.take() { + let _ = tx.send(()); + } + + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + tracing::info!("notify on start actor stopped"); + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "NotifyOnStartActor" + } +} + +/// Actor that verifies it received the expected input data +/// Crashes if input doesn't match or is missing, succeeds if it matches +pub struct VerifyInputActor { + expected_input: Vec, +} + +impl VerifyInputActor { + pub fn new(expected_input: Vec) -> Self { + Self { expected_input } + } +} + +#[async_trait] +impl TestActor for VerifyInputActor { + async fn on_start(&mut self, config: ActorConfig) -> Result { + tracing::info!( + actor_id = ?config.actor_id, + generation = config.generation, + expected_input_size = self.expected_input.len(), + received_input_size = config.input.as_ref().map(|i| i.len()), + "verify input actor started, checking input" + ); + + // Check if input is present + let Some(received_input) = &config.input else { + tracing::error!("no input data received"); + return Ok(ActorStartResult::Crash { + code: 1, + message: "no input data received".to_string(), + }); + }; + + // Check if input matches expected + if received_input != &self.expected_input { + tracing::error!( + expected_len = self.expected_input.len(), + received_len = received_input.len(), + "input data mismatch" + ); + return Ok(ActorStartResult::Crash { + code: 1, + message: format!( + "input mismatch: expected {} bytes, got {} bytes", + self.expected_input.len(), + received_input.len() + ), + }); + } + + tracing::info!("input data verified successfully"); + Ok(ActorStartResult::Running) + } + + async fn on_stop(&mut self) -> Result { + tracing::info!("verify input actor stopped"); + Ok(ActorStopResult::Success) + } + + fn name(&self) -> &str { + "VerifyInputActor" + } +} diff --git a/engine/packages/engine/tests/common/test_runner/mod.rs b/engine/packages/engine/tests/common/test_runner/mod.rs new file mode 100644 index 0000000000..15a810adf2 --- /dev/null +++ b/engine/packages/engine/tests/common/test_runner/mod.rs @@ -0,0 +1,21 @@ +//! Rust-based test runner for deep actor lifecycle testing. +//! +//! This module provides a pure Rust implementation of a runner that can be fully controlled +//! from tests, allowing simulation of: +//! - Actor crashes with specific exit codes +//! - Protocol timing issues (delays, timeouts) +//! - Custom protocol events (sleep, alarms, etc.) +//! - Runner disconnection/reconnection scenarios + +mod actor; +mod behaviors; +mod protocol; +mod runner; + +pub use actor::{ActorConfig, ActorStartResult, ActorStopResult, TestActor}; +pub use behaviors::{ + CrashNTimesThenSucceedActor, CrashOnStartActor, DelayedStartActor, EchoActor, + NotifyOnStartActor, SleepImmediatelyActor, StopImmediatelyActor, TimeoutActor, + VerifyInputActor, +}; +pub use runner::{TestRunner, TestRunnerBuilder}; diff --git a/engine/packages/engine/tests/common/test_runner/protocol.rs b/engine/packages/engine/tests/common/test_runner/protocol.rs new file mode 100644 index 0000000000..33deeb9980 --- /dev/null +++ b/engine/packages/engine/tests/common/test_runner/protocol.rs @@ -0,0 +1,57 @@ +use anyhow::*; +use rivet_runner_protocol as rp; +use vbare::OwnedVersionedData; + +pub const PROTOCOL_VERSION: u16 = rp::PROTOCOL_MK1_VERSION; + +/// Helper to decode messages from server +pub fn decode_to_client(buf: &[u8], protocol_version: u16) -> Result { + // Use versioned deserialization to handle protocol version properly + ::deserialize(buf, protocol_version) +} + +/// Helper to encode messages to server +pub fn encode_to_server(msg: rp::ToServer) -> Vec { + rp::versioned::ToServer::wrap_latest(msg) + .serialize(PROTOCOL_VERSION) + .expect("failed to serialize ToServer") +} + +/// Helper to create event wrapper with index +pub fn make_event_wrapper(index: u64, event: rp::Event) -> rp::EventWrapper { + rp::EventWrapper { + index: index as i64, + inner: event, + } +} + +/// Helper to create actor state update event +pub fn make_actor_state_update( + actor_id: &str, + generation: u32, + state: rp::ActorState, +) -> rp::Event { + rp::Event::EventActorStateUpdate(rp::EventActorStateUpdate { + actor_id: actor_id.to_string(), + generation, + state, + }) +} + +/// Helper to create actor intent event +pub fn make_actor_intent(actor_id: &str, generation: u32, intent: rp::ActorIntent) -> rp::Event { + rp::Event::EventActorIntent(rp::EventActorIntent { + actor_id: actor_id.to_string(), + generation, + intent, + }) +} + +/// Helper to create set alarm event +pub fn make_set_alarm(actor_id: &str, generation: u32, alarm_ts: Option) -> rp::Event { + rp::Event::EventActorSetAlarm(rp::EventActorSetAlarm { + actor_id: actor_id.to_string(), + generation, + alarm_ts, + }) +} diff --git a/engine/packages/engine/tests/common/test_runner/runner.rs b/engine/packages/engine/tests/common/test_runner/runner.rs new file mode 100644 index 0000000000..2b69fc610f --- /dev/null +++ b/engine/packages/engine/tests/common/test_runner/runner.rs @@ -0,0 +1,825 @@ +use super::{actor::*, protocol}; +use anyhow::*; +use futures_util::{SinkExt, StreamExt}; +use rivet_runner_protocol as rp; +use rivet_util::Id; +use std::{ + collections::HashMap, + str::FromStr, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Duration, +}; +use tokio::sync::{Mutex, mpsc, oneshot}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{self, Message}, +}; + +use super::actor::KvRequest; + +const RUNNER_PING_INTERVAL: Duration = Duration::from_secs(15); + +type ActorFactory = Arc Box + Send + Sync>; +type WsStream = + tokio_tungstenite::WebSocketStream>; + +/// Configuration for test runner +#[derive(Clone)] +struct Config { + namespace: String, + runner_name: String, + runner_key: String, + version: u32, + total_slots: u32, + endpoint: String, + token: String, + actor_factories: HashMap, +} + +/// Test runner for actor lifecycle testing +pub struct TestRunner { + config: Config, + + // State + pub runner_id: Arc>>, + actors: Arc>>, + last_command_idx: Arc>, + next_event_idx: Arc>, + event_history: Arc>>, + shutdown: Arc, + + // Event channel for actors to push events + event_tx: mpsc::UnboundedSender, + event_rx: Arc>>, + + // KV request channel for actors to send KV requests + kv_request_tx: mpsc::UnboundedSender, + kv_request_rx: Arc>>, + next_kv_request_id: Arc>, + kv_pending_requests: Arc>>>, + + // Shutdown channel + shutdown_tx: Arc>>>, +} + +struct ActorState { + actor_id: String, + generation: u32, + actor: Box, +} + +/// Builder for test runner +pub struct TestRunnerBuilder { + namespace: String, + runner_name: String, + runner_key: String, + version: u32, + total_slots: u32, + actor_factories: HashMap, +} + +impl TestRunnerBuilder { + pub fn new(namespace: &str) -> Self { + Self { + namespace: namespace.to_string(), + runner_name: "test-runner".to_string(), + runner_key: format!("key-{:012x}", rand::random::()), + version: 1, + total_slots: 100, + actor_factories: HashMap::new(), + } + } + + pub fn with_runner_name(mut self, name: &str) -> Self { + self.runner_name = name.to_string(); + self + } + + pub fn with_runner_key(mut self, key: &str) -> Self { + self.runner_key = key.to_string(); + self + } + + pub fn with_version(mut self, version: u32) -> Self { + self.version = version; + self + } + + pub fn with_total_slots(mut self, total_slots: u32) -> Self { + self.total_slots = total_slots; + self + } + + /// Register an actor factory for a specific actor name + pub fn with_actor_behavior(mut self, actor_name: &str, factory: F) -> Self + where + F: Fn(ActorConfig) -> Box + Send + Sync + 'static, + { + self.actor_factories + .insert(actor_name.to_string(), Arc::new(factory)); + self + } + + pub async fn build(self, dc: &super::super::TestDatacenter) -> Result { + let endpoint = format!("http://127.0.0.1:{}", dc.guard_port()); + let token = "dev".to_string(); + + let config = Config { + namespace: self.namespace, + runner_name: self.runner_name, + runner_key: self.runner_key, + version: self.version, + total_slots: self.total_slots, + endpoint, + token, + actor_factories: self.actor_factories, + }; + + // Create event channel for actors to push events + let (event_tx, event_rx) = mpsc::unbounded_channel(); + + // Create KV request channel for actors to send KV requests + let (kv_request_tx, kv_request_rx) = mpsc::unbounded_channel(); + + Ok(TestRunner { + config, + runner_id: Arc::new(Mutex::new(None)), + actors: Arc::new(Mutex::new(HashMap::new())), + last_command_idx: Arc::new(Mutex::new(-1)), + next_event_idx: Arc::new(Mutex::new(0)), + event_history: Arc::new(Mutex::new(Vec::new())), + shutdown: Arc::new(AtomicBool::new(false)), + event_tx, + event_rx: Arc::new(Mutex::new(event_rx)), + kv_request_tx, + kv_request_rx: Arc::new(Mutex::new(kv_request_rx)), + next_kv_request_id: Arc::new(Mutex::new(0)), + kv_pending_requests: Arc::new(Mutex::new(HashMap::new())), + shutdown_tx: Arc::new(Mutex::new(None)), + }) + } +} + +impl TestRunner { + /// Start the test runner + pub async fn start(&self) -> Result<()> { + tracing::info!( + namespace = %self.config.namespace, + runner_name = %self.config.runner_name, + runner_key = %self.config.runner_key, + "starting test runner" + ); + + let ws_url = self.build_ws_url(); + + tracing::debug!(ws_url = %ws_url, "connecting to pegboard"); + + // Connect to WebSocket with protocols + let token_protocol = format!("rivet_token.{}", self.config.token); + + // Build the request properly with all WebSocket headers + use tokio_tungstenite::tungstenite::client::IntoClientRequest; + let mut request = ws_url + .into_client_request() + .context("failed to build WebSocket request")?; + + // Add the Sec-WebSocket-Protocol header + request.headers_mut().insert( + "Sec-WebSocket-Protocol", + format!("rivet, {}", token_protocol).parse().unwrap(), + ); + + let (ws_stream, _response) = connect_async(request) + .await + .context("failed to connect to WebSocket")?; + + tracing::info!("websocket connected"); + + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + *self.shutdown_tx.lock().await = Some(shutdown_tx); + + // Clone self for the spawned task + let runner = self.clone_for_task(); + + tokio::spawn(async move { + if let Err(err) = runner.run_message_loop(ws_stream, shutdown_rx).await { + tracing::error!(?err, "test runner message loop failed"); + } + }); + + Ok(()) + } + + /// Clone the runner for passing to async tasks + fn clone_for_task(&self) -> Self { + Self { + config: self.config.clone(), + runner_id: self.runner_id.clone(), + actors: self.actors.clone(), + last_command_idx: self.last_command_idx.clone(), + next_event_idx: self.next_event_idx.clone(), + event_history: self.event_history.clone(), + shutdown: self.shutdown.clone(), + event_tx: self.event_tx.clone(), + event_rx: self.event_rx.clone(), + kv_request_tx: self.kv_request_tx.clone(), + kv_request_rx: self.kv_request_rx.clone(), + next_kv_request_id: self.next_kv_request_id.clone(), + kv_pending_requests: self.kv_pending_requests.clone(), + shutdown_tx: self.shutdown_tx.clone(), + } + } + + /// Wait for runner to be ready and return runner ID + pub async fn wait_ready(&self) -> String { + // Poll until runner_id is set + loop { + let runner_id = self.runner_id.lock().await; + if let Some(id) = runner_id.as_ref() { + return id.clone(); + } + drop(runner_id); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + /// Check if runner has an actor + pub async fn has_actor(&self, actor_id: &str) -> bool { + let actors = self.actors.lock().await; + actors.contains_key(actor_id) + } + + /// Get runner's current actors + pub async fn get_actor_ids(&self) -> Vec { + let actors = self.actors.lock().await; + + actors + .keys() + .map(|x| Id::from_str(x).expect("failed to parse actor_id")) + .collect::>() + } + + pub fn name(&self) -> &str { + &self.config.runner_name + } + + /// Shutdown the runner + pub async fn shutdown(&self) { + tracing::info!("shutting down test runner"); + self.shutdown.store(true, Ordering::SeqCst); + + // Send shutdown signal to close ws_stream + if let Some(tx) = self.shutdown_tx.lock().await.take() { + let _ = tx.send(()); + } + } + + fn build_ws_url(&self) -> String { + let ws_endpoint = self.config.endpoint.replace("http://", "ws://"); + format!( + "{}/runners/connect?protocol_version={}&namespace={}&runner_key={}", + ws_endpoint.trim_end_matches('/'), + super::protocol::PROTOCOL_VERSION, + urlencoding::encode(&self.config.namespace), + urlencoding::encode(&self.config.runner_key) + ) + } + + async fn build_init_message(&self) -> rp::ToServer { + let last_command_idx = *self.last_command_idx.lock().await; + + rp::ToServer::ToServerInit(rp::ToServerInit { + name: self.config.runner_name.clone(), + version: self.config.version, + total_slots: self.config.total_slots, + last_command_idx: if last_command_idx >= 0 { + Some(last_command_idx) + } else { + None + }, + prepopulate_actor_names: None, + metadata: None, + }) + } + + async fn run_message_loop( + self, + mut ws_stream: WsStream, + mut shutdown_rx: oneshot::Receiver<()>, + ) -> Result<()> { + // Send init message + let init_msg = self.build_init_message().await; + let encoded = protocol::encode_to_server(init_msg); + ws_stream + .send(Message::Binary(encoded.into())) + .await + .context("failed to send init message")?; + + tracing::debug!("sent init message"); + + let mut ping_interval = tokio::time::interval(RUNNER_PING_INTERVAL); + // We lock here as these rx's are only for run_message_loop + let mut event_rx = self.event_rx.lock().await; + let mut kv_request_rx = self.kv_request_rx.lock().await; + + loop { + tokio::select! { + biased; + _ = &mut shutdown_rx => { + tracing::info!("received shutdown signal, closing websocket"); + let _ = ws_stream.close(None).await; + break; + } + + _ = ping_interval.tick() => { + if self.shutdown.load(Ordering::SeqCst) { + break; + } + + // Send ping + let ping = rp::ToServer::ToServerPing(rp::ToServerPing { + ts: chrono::Utc::now().timestamp_millis(), + }); + let encoded = protocol::encode_to_server(ping); + ws_stream.send(Message::Binary(encoded.into())).await?; + } + + // Listen for events pushed from actors + Some(actor_event) = event_rx.recv() => { + if self.shutdown.load(Ordering::SeqCst) { + break; + } + + tracing::debug!( + actor_id = ?actor_event.actor_id, + generation = actor_event.generation, + "received event from actor" + ); + + self.send_actor_event(&mut ws_stream, actor_event).await?; + } + + // Listen for KV requests from actors + Some(kv_request) = kv_request_rx.recv() => { + if self.shutdown.load(Ordering::SeqCst) { + break; + } + + tracing::debug!( + actor_id = ?kv_request.actor_id, + "received kv request from actor" + ); + + self.send_kv_request(&mut ws_stream, kv_request).await?; + } + + msg = ws_stream.next() => { + if self.shutdown.load(Ordering::SeqCst) { + break; + } + + match msg { + Some(std::result::Result::Ok(Message::Binary(buf))) => { + self.handle_message(&mut ws_stream, &buf).await?; + } + Some(std::result::Result::Ok(Message::Close(_))) => { + tracing::info!("websocket closed by server"); + break; + } + Some(std::result::Result::Err(err)) => { + tracing::error!(?err, "websocket error"); + return Err(err.into()); + } + None => { + tracing::info!("websocket stream ended"); + break; + } + _ => {} + } + } + } + } + + tracing::info!("test runner message loop exiting"); + Ok(()) + } + + /// Send an event pushed from an actor + async fn send_actor_event( + &self, + ws_stream: &mut WsStream, + actor_event: ActorEvent, + ) -> Result<()> { + let mut idx = self.next_event_idx.lock().await; + let event_wrapper = protocol::make_event_wrapper(*idx, actor_event.event); + *idx += 1; + drop(idx); + + self.event_history.lock().await.push(event_wrapper.clone()); + + tracing::debug!( + actor_id = ?actor_event.actor_id, + generation = actor_event.generation, + event_idx = event_wrapper.index, + "sending actor event" + ); + + let msg = rp::ToServer::ToServerEvents(vec![event_wrapper]); + let encoded = protocol::encode_to_server(msg); + ws_stream.send(Message::Binary(encoded.into())).await?; + + Ok(()) + } + + async fn handle_message(&self, ws_stream: &mut WsStream, buf: &[u8]) -> Result<()> { + let msg = protocol::decode_to_client(buf, super::protocol::PROTOCOL_VERSION)?; + + match msg { + rp::ToClient::ToClientInit(init) => { + self.handle_init(init, ws_stream).await?; + } + rp::ToClient::ToClientCommands(commands) => { + self.handle_commands(commands, ws_stream).await?; + } + rp::ToClient::ToClientAckEvents(ack) => { + self.handle_ack_events(ack).await; + } + rp::ToClient::ToClientKvResponse(response) => { + self.handle_kv_response(response).await; + } + _ => { + tracing::debug!(?msg, "ignoring message type"); + } + } + + Ok(()) + } + + async fn handle_init(&self, init: rp::ToClientInit, ws_stream: &mut WsStream) -> Result<()> { + tracing::info!( + runner_id = %init.runner_id, + last_event_idx = ?init.last_event_idx, + "received init from server" + ); + + *self.runner_id.lock().await = Some(init.runner_id.clone()); + + // Resend unacknowledged events + let events = self.event_history.lock().await; + let to_resend: Vec<_> = events + .iter() + .filter(|e| e.index > init.last_event_idx) + .cloned() + .collect(); + drop(events); + + if !to_resend.is_empty() { + tracing::info!(count = to_resend.len(), "resending unacknowledged events"); + let msg = rp::ToServer::ToServerEvents(to_resend); + let encoded = protocol::encode_to_server(msg); + ws_stream.send(Message::Binary(encoded.into())).await?; + } + + Ok(()) + } + + async fn handle_commands( + &self, + commands: Vec, + ws_stream: &mut WsStream, + ) -> Result<()> { + tracing::info!(count = commands.len(), "received commands"); + + for cmd_wrapper in commands { + tracing::debug!( + index = cmd_wrapper.index, + command = ?cmd_wrapper.inner, + "processing command" + ); + + match cmd_wrapper.inner { + rp::Command::CommandStartActor(start_cmd) => { + self.handle_start_actor(start_cmd, ws_stream).await?; + } + rp::Command::CommandStopActor(stop_cmd) => { + self.handle_stop_actor(stop_cmd, ws_stream).await?; + } + } + + *self.last_command_idx.lock().await = cmd_wrapper.index as i64; + } + + Ok(()) + } + + async fn handle_start_actor( + &self, + cmd: rp::CommandStartActor, + ws_stream: &mut WsStream, + ) -> Result<()> { + let actor_id = cmd.actor_id.clone(); + let generation = cmd.generation; + + tracing::info!(?actor_id, generation, name = %cmd.config.name, "starting actor"); + + // Create actor config + let mut config = ActorConfig::from(&cmd.config); + config.actor_id = actor_id.clone(); + config.generation = generation; + config.event_tx = self.event_tx.clone(); + config.kv_request_tx = self.kv_request_tx.clone(); + + // Get factory for this actor name + let factory = self + .config + .actor_factories + .get(&cmd.config.name) + .context(format!( + "no factory registered for actor name: {}", + cmd.config.name + ))?; + + // Create actor + let mut actor = factory(config.clone()); + + tracing::debug!( + ?actor_id, + generation, + actor_type = actor.name(), + "created actor instance" + ); + + // Call on_start + let start_result = actor + .on_start(config) + .await + .context("actor on_start failed")?; + + tracing::debug!( + ?actor_id, + generation, + ?start_result, + "actor on_start completed" + ); + + // Store actor + let actor_state = ActorState { + actor_id: actor_id.clone(), + generation, + actor, + }; + self.actors + .lock() + .await + .insert(actor_id.clone(), actor_state); + + // Handle start result + match start_result { + ActorStartResult::Running => { + self.send_actor_state_update( + &actor_id, + generation, + rp::ActorState::ActorStateRunning, + ws_stream, + ) + .await?; + } + ActorStartResult::Delay(duration) => { + //TODO: For now, we just wait synchronously. In the future, we could + // implement this with a channel-based event queue + tracing::info!( + ?actor_id, + generation, + delay_ms = duration.as_millis(), + "delaying before sending running state" + ); + tokio::time::sleep(duration).await; + self.send_actor_state_update( + &actor_id, + generation, + rp::ActorState::ActorStateRunning, + ws_stream, + ) + .await?; + } + ActorStartResult::Timeout => { + tracing::warn!( + ?actor_id, + generation, + "actor will timeout (not sending running)" + ); + // Don't send running state + } + ActorStartResult::Crash { code, message } => { + tracing::warn!(?actor_id, generation, code, %message, "actor crashed on start"); + self.send_actor_state_update( + &actor_id, + generation, + rp::ActorState::ActorStateStopped(rp::ActorStateStopped { + code: if code == 0 { + rp::StopCode::Ok + } else { + rp::StopCode::Error + }, + message: Some(message), + }), + ws_stream, + ) + .await?; + + // Remove actor + self.actors.lock().await.remove(&actor_id); + } + } + + Ok(()) + } + + async fn handle_stop_actor( + &self, + cmd: rp::CommandStopActor, + ws_stream: &mut WsStream, + ) -> Result<()> { + let actor_id = cmd.actor_id.clone(); + let generation = cmd.generation; + + tracing::info!(?actor_id, generation, "stopping actor"); + + // Get actor + let mut actors_guard = self.actors.lock().await; + let actor_state = actors_guard.get_mut(&actor_id).context("actor not found")?; + + // Call on_stop + let stop_result = actor_state + .actor + .on_stop() + .await + .context("actor on_stop failed")?; + + tracing::debug!( + ?actor_id, + generation, + ?stop_result, + "actor on_stop completed" + ); + + // Handle stop result + match stop_result { + ActorStopResult::Success => { + self.send_actor_state_update( + &actor_id, + generation, + rp::ActorState::ActorStateStopped(rp::ActorStateStopped { + code: rp::StopCode::Ok, + message: None, + }), + ws_stream, + ) + .await?; + } + ActorStopResult::Delay(duration) => { + tracing::info!(?actor_id, generation, ?duration, "delaying stop"); + tokio::time::sleep(duration).await; + self.send_actor_state_update( + &actor_id, + generation, + rp::ActorState::ActorStateStopped(rp::ActorStateStopped { + code: rp::StopCode::Ok, + message: None, + }), + ws_stream, + ) + .await?; + } + ActorStopResult::Crash { code, message } => { + tracing::warn!(?actor_id, generation, code, %message, "actor crashed on stop"); + self.send_actor_state_update( + &actor_id, + generation, + rp::ActorState::ActorStateStopped(rp::ActorStateStopped { + code: if code == 0 { + rp::StopCode::Ok + } else { + rp::StopCode::Error + }, + message: Some(message), + }), + ws_stream, + ) + .await?; + } + } + + // Remove actor + actors_guard.remove(&actor_id); + + Ok(()) + } + + async fn handle_ack_events(&self, ack: rp::ToClientAckEvents) { + let last_acked_idx = ack.last_event_idx; + + let mut events = self.event_history.lock().await; + let original_len = events.len(); + events.retain(|e| e.index > last_acked_idx); + + let pruned = original_len - events.len(); + if pruned > 0 { + tracing::debug!(last_acked_idx, pruned, "pruned acknowledged events"); + } + } + + async fn send_actor_state_update( + &self, + actor_id: &str, + generation: u32, + state: rp::ActorState, + ws_stream: &mut WsStream, + ) -> Result<()> { + let event = protocol::make_actor_state_update(actor_id, generation, state); + + let mut idx = self.next_event_idx.lock().await; + let event_wrapper = protocol::make_event_wrapper(*idx, event); + *idx += 1; + drop(idx); + + self.event_history.lock().await.push(event_wrapper.clone()); + + tracing::debug!( + ?actor_id, + generation, + event_idx = event_wrapper.index, + "sending actor state update" + ); + + let msg = rp::ToServer::ToServerEvents(vec![event_wrapper]); + let encoded = protocol::encode_to_server(msg); + ws_stream.send(Message::Binary(encoded.into())).await?; + + Ok(()) + } + + async fn send_kv_request(&self, ws_stream: &mut WsStream, kv_request: KvRequest) -> Result<()> { + let mut request_id = self.next_kv_request_id.lock().await; + let id = *request_id; + *request_id += 1; + drop(request_id); + + // Store the response channel + self.kv_pending_requests + .lock() + .await + .insert(id, kv_request.response_tx); + + tracing::debug!( + actor_id = ?kv_request.actor_id, + request_id = id, + "sending kv request" + ); + + let msg = rp::ToServer::ToServerKvRequest(rp::ToServerKvRequest { + actor_id: kv_request.actor_id, + request_id: id, + data: kv_request.data, + }); + let encoded = protocol::encode_to_server(msg); + ws_stream.send(Message::Binary(encoded.into())).await?; + + Ok(()) + } + + async fn handle_kv_response(&self, response: rp::ToClientKvResponse) { + let request_id = response.request_id; + + tracing::debug!(request_id, "received kv response"); + + let response_tx = self.kv_pending_requests.lock().await.remove(&request_id); + + if let Some(tx) = response_tx { + let _ = tx.send(response.data); + } else { + tracing::warn!(request_id, "received kv response for unknown request id"); + } + } +} + +impl Drop for TestRunner { + fn drop(&mut self) { + // Signal shutdown when runner is dropped + self.shutdown.store(true, Ordering::SeqCst); + tracing::debug!("test runner dropped, shutdown signaled"); + } +} + +// actor_graceful_stop_with_destroy_policy +// actor_sleep_intent +// actor_start_timeout +// crash_policy_destroy +// crash_policy_restart +// crash_policy_restart_resets_on_success +// crash_policy_sleep +// exponential_backoff_max_retries diff --git a/engine/packages/runtime/src/term_signal.rs b/engine/packages/runtime/src/term_signal.rs index b6ef0470bb..be2860fbad 100644 --- a/engine/packages/runtime/src/term_signal.rs +++ b/engine/packages/runtime/src/term_signal.rs @@ -13,7 +13,8 @@ use tokio::signal::windows::ctrl_c as windows_ctrl_c; const FORCE_CLOSE_THRESHOLD: usize = 3; -static HANDLER_CELL: OnceCell<(watch::Receiver, JoinHandle<()>)> = OnceCell::const_new(); +static HANDLER_CELL: OnceCell<(watch::Receiver, Option>)> = + OnceCell::const_new(); /// Cross-platform termination signal wrapper that handles: /// - Unix: SIGTERM and SIGINT @@ -88,9 +89,22 @@ impl TermSignal { .expect("failed initializing termination signal handler"); let rx = term_signal.tx.subscribe(); - let join_handle = tokio::spawn(term_signal.run()); - - std::future::ready((rx, join_handle)) + let tokio_join_handle = + if std::env::var("RIVET_TEST_RUNTIME") == Ok("1".to_string()) { + std::thread::spawn(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to start term signal handler runtime for tests"); + + runtime.block_on(term_signal.run()); + }); + None + } else { + Some(tokio::spawn(term_signal.run())) + }; + + std::future::ready((rx, tokio_join_handle)) }) .await .0 @@ -101,13 +115,19 @@ impl TermSignal { /// Returns true if the user should abort any graceful attempt to shutdown and shutdown immediately. pub async fn recv(&mut self) -> bool { - let _ = self.0.changed().await; + let change = self.0.changed().await; + tracing::info!(?change, "hello?"); + *self.0.borrow() + // let _ = std::future::pending::<()>().await; + // false } pub fn stop() { - if let Some((_, join_handle)) = HANDLER_CELL.get() { - join_handle.abort(); + if let Some((_, tokio_join_handle)) = HANDLER_CELL.get() { + if let Some(tokio_join_handle) = tokio_join_handle { + tokio_join_handle.abort(); + } } } } diff --git a/engine/packages/test-deps/src/lib.rs b/engine/packages/test-deps/src/lib.rs index 842ab2488a..5a13877dbf 100644 --- a/engine/packages/test-deps/src/lib.rs +++ b/engine/packages/test-deps/src/lib.rs @@ -1,5 +1,6 @@ use anyhow::*; use futures_util::future; +use tokio::sync::OnceCell; use url::Url; mod datacenter; @@ -8,6 +9,20 @@ pub use datacenter::*; pub use rivet_test_deps_docker::*; use uuid::Uuid; +static IS_TEST_ENVIRON_SETUP: OnceCell<()> = OnceCell::const_new(); + +async fn set_test_environment_variables() { + IS_TEST_ENVIRON_SETUP + .get_or_init(async || { + // SAFETY: RIVET_TEST_RUNTIME is set before anything else + // happens in the test, and is initialized only once. + unsafe { + std::env::set_var("RIVET_TEST_RUNTIME", "1"); + }; + }) + .await; +} + pub struct TestDeps { pub pools: rivet_pools::Pools, pub config: rivet_config::Config, @@ -19,10 +34,12 @@ pub struct TestDeps { impl TestDeps { pub async fn new() -> Result { + set_test_environment_variables().await; TestDeps::new_with_test_id(Uuid::new_v4()).await } pub async fn new_with_test_id(test_id: Uuid) -> Result { + set_test_environment_variables().await; TestDeps::new_multi_with_test_id(&[1], test_id) .await? .into_iter() @@ -31,10 +48,12 @@ impl TestDeps { } pub async fn new_multi(dc_ids: &[u16]) -> Result> { + set_test_environment_variables().await; Self::new_multi_with_test_id(dc_ids, Uuid::new_v4()).await } pub async fn new_multi_with_test_id(dc_ids: &[u16], test_id: Uuid) -> Result> { + set_test_environment_variables().await; tracing::info!(?dc_ids, "setting up test dependencies"); let mut datacenters = Vec::with_capacity(dc_ids.len());