diff --git a/engine/packages/api-peer/src/actors/kv_get.rs b/engine/packages/api-peer/src/actors/kv_get.rs index 3ce45cf1d9..7f81aa7563 100644 --- a/engine/packages/api-peer/src/actors/kv_get.rs +++ b/engine/packages/api-peer/src/actors/kv_get.rs @@ -1,6 +1,6 @@ use anyhow::*; -use base64::prelude::BASE64_STANDARD; use base64::Engine; +use base64::prelude::BASE64_STANDARD; use pegboard_actor_kv as actor_kv; use rivet_api_builder::ApiCtx; use rivet_util::Id; diff --git a/engine/packages/api-public/src/actors/kv_get.rs b/engine/packages/api-public/src/actors/kv_get.rs index 5e9dd3b9db..0c1c4d79ca 100644 --- a/engine/packages/api-public/src/actors/kv_get.rs +++ b/engine/packages/api-public/src/actors/kv_get.rs @@ -65,7 +65,11 @@ async fn kv_get_inner(ctx: ApiCtx, path: KvGetPath) -> Result { request_remote_datacenter_raw( &ctx, path.actor_id.label(), - &format!("/actors/{}/kv/keys/{}", path.actor_id, urlencoding::encode(&path.key)), + &format!( + "/actors/{}/kv/keys/{}", + path.actor_id, + urlencoding::encode(&path.key) + ), axum::http::Method::GET, Option::<&()>::None, Option::<&()>::None, diff --git a/engine/packages/universaldb/src/utils/mod.rs b/engine/packages/universaldb/src/utils/mod.rs index 8b6d673c87..a81a3ba99b 100644 --- a/engine/packages/universaldb/src/utils/mod.rs +++ b/engine/packages/universaldb/src/utils/mod.rs @@ -24,13 +24,26 @@ pub enum IsolationLevel { #[derive(Debug, Clone, Copy)] pub struct MaybeCommitted(pub bool); +/// Calculate exponential backoff based on attempt. +/// +/// Ours: +/// 0 -> 10ms + 0-1ms jitter +/// 1 -> 20ms + 0-2ms jitter +/// 2 -> 40ms + 0-4ms jitter +/// ... +/// 7 (max) -> 1280ms + 0-128ms jitter +/// FDB (see https://github.com/apple/foundationdb/blob/b1fbbd87a794b7c6c2f456925c45d8af339a8ae0/fdbclient/NativeAPI.actor.cpp#L4333 and https://github.com/apple/foundationdb/blob/b1fbbd87a794b7c6c2f456925c45d8af339a8ae0/fdbclient/ClientKnobs.cpp#L74-L76): +/// 0 -> 10ms +/// 1 -> 20ms +/// 2 -> 40ms +/// ... +/// X -> max 1s pub fn calculate_tx_retry_backoff(attempt: usize) -> u64 { - // TODO: Update this to mirror fdb 1:1: - // https://github.com/apple/foundationdb/blob/21407341d9b49e1d343514a7a5f395bd5f232079/fdbclient/NativeAPI.actor.cpp#L3162 + let base = 2_u64.pow((attempt as u32).min(7)); + let base_backoff_ms = base * 10; - let base_backoff_ms = 2_u64.pow((attempt as u32).min(10)) * 10; - - let jitter_ms = rand::random::() % 100; + // Jitter is 0-10% of backoff ms + let jitter_ms = rand::random::() % base; base_backoff_ms + jitter_ms } diff --git a/engine/packages/universaldb/tests/rocksdb.rs b/engine/packages/universaldb/tests/rocksdb.rs index 814382c2ef..036cd712eb 100644 --- a/engine/packages/universaldb/tests/rocksdb.rs +++ b/engine/packages/universaldb/tests/rocksdb.rs @@ -4,7 +4,10 @@ use anyhow::Context; use futures_util::StreamExt; use rivet_test_deps_docker::TestDatabase; use rocksdb::{OptimisticTransactionDB, Options, WriteOptions}; -use universaldb::{Database, utils::IsolationLevel::*}; +use universaldb::{ + Database, + utils::{IsolationLevel::*, calculate_tx_retry_backoff}, +}; use uuid::Uuid; #[tokio::test] @@ -136,11 +139,3 @@ async fn rocksdb_udb() { }) .await; } - -pub fn calculate_tx_retry_backoff(attempt: usize) -> u64 { - let base_backoff_ms = 2_u64.pow((attempt as u32).min(10)) * 10; - - let jitter_ms = rand::random::() % 100; - - base_backoff_ms + jitter_ms -} diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index 69bc8d5e0b..b72f99d8f2 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -1,4 +1,4 @@ - +import assert from "node:assert" import * as bare from "@bare-ts/lib" const DEFAULT_CONFIG = /* @__PURE__ */ bare.Config({}) @@ -53,6 +53,9 @@ export function writeKvValue(bc: bare.ByteCursor, x: KvValue): void { export type KvMetadata = { readonly version: ArrayBuffer + /** + * TODO: Rename to update_ts + */ readonly createTs: i64 } @@ -1925,9 +1928,3 @@ export function decodeToServerlessServer(bytes: Uint8Array): ToServerlessServer } return result } - - -function assert(condition: boolean, message?: string): asserts condition { - if (!condition) throw new Error(message ?? "Assertion failed") -} -