From ac7e31ebf440080beb54fb00248550b75416fceb Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 15 Apr 2026 10:14:52 -0700 Subject: [PATCH 1/4] feat: add load-testing/stress-testing suite Add eventcore-stress, a binary crate for sustained concurrent load testing against all EventStore backends. Unlike the existing Criterion micro-benchmarks (eventcore-bench) which measure single-operation latency, this suite measures system behavior under concurrent load: throughput, tail latencies (p50/p95/p99), retry rates, and correctness. Five scenarios exercise different contention patterns: - contention: N tasks deposit to one shared stream (version conflicts) - throughput: per-task streams measuring raw store throughput - transfers: multi-stream atomic transfers with conservation checks - projection: batch-mode projection catch-up after concurrent writes - pool-saturation: postgres-only pool exhaustion under high concurrency The binary is never discovered by cargo nextest/test (no #[test] fns), compiles cleanly under cargo clippy --workspace, and is excluded from mutation testing. Run via: cargo run -p eventcore-stress -- Also bumps rustls-webpki 0.103.10 -> 0.103.12 to resolve RUSTSEC-2026-0098 and RUSTSEC-2026-0099. --- .cargo/mutants.toml | 2 + Cargo.lock | 206 ++++++++++++- Cargo.toml | 1 + eventcore-stress/Cargo.toml | 41 +++ eventcore-stress/src/backends.rs | 27 ++ eventcore-stress/src/config.rs | 78 +++++ eventcore-stress/src/domain.rs | 221 ++++++++++++++ eventcore-stress/src/main.rs | 273 ++++++++++++++++++ eventcore-stress/src/metrics.rs | 164 +++++++++++ eventcore-stress/src/runner.rs | 133 +++++++++ eventcore-stress/src/scenarios/contention.rs | 119 ++++++++ eventcore-stress/src/scenarios/mod.rs | 5 + .../src/scenarios/pool_saturation.rs | 98 +++++++ eventcore-stress/src/scenarios/projection.rs | 217 ++++++++++++++ eventcore-stress/src/scenarios/throughput.rs | 98 +++++++ eventcore-stress/src/scenarios/transfers.rs | 150 ++++++++++ 16 files changed, 1828 insertions(+), 5 deletions(-) create mode 100644 eventcore-stress/Cargo.toml create mode 100644 eventcore-stress/src/backends.rs create mode 100644 eventcore-stress/src/config.rs create mode 100644 eventcore-stress/src/domain.rs create mode 100644 eventcore-stress/src/main.rs create mode 100644 eventcore-stress/src/metrics.rs create mode 100644 eventcore-stress/src/runner.rs create mode 100644 eventcore-stress/src/scenarios/contention.rs create mode 100644 eventcore-stress/src/scenarios/mod.rs create mode 100644 eventcore-stress/src/scenarios/pool_saturation.rs create mode 100644 eventcore-stress/src/scenarios/projection.rs create mode 100644 eventcore-stress/src/scenarios/throughput.rs create mode 100644 eventcore-stress/src/scenarios/transfers.rs diff --git a/.cargo/mutants.toml b/.cargo/mutants.toml index c180e30..48a7136 100644 --- a/.cargo/mutants.toml +++ b/.cargo/mutants.toml @@ -9,6 +9,8 @@ exclude_re = [ "impl EventReader for &T.*::read_after", # Benchmark crate has no behavioral tests "eventcore-bench/", + # Stress test binary has no behavioral tests + "eventcore-stress/", ] # Note: To enforce 100% mutation coverage locally, run: cargo mutants --workspace --check diff --git a/Cargo.lock b/Cargo.lock index 57183d2..e95a3b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "ahash" version = "0.8.12" @@ -53,12 +59,56 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -89,6 +139,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -228,6 +284,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] @@ -236,8 +293,22 @@ version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -246,6 +317,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -300,6 +377,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.8.2" @@ -336,6 +422,15 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -596,6 +691,27 @@ dependencies = [ "uuid", ] +[[package]] +name = "eventcore-stress" +version = "0.7.0" +dependencies = [ + "clap", + "eventcore", + "eventcore-macros", + "eventcore-memory", + "eventcore-postgres", + "eventcore-sqlite", + "eventcore-types", + "hdrhistogram", + "nutype", + "rand 0.9.2", + "serde", + "serde_json", + "thiserror", + "tokio", + "uuid", +] + [[package]] name = "eventcore-testing" version = "0.7.0" @@ -647,6 +763,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "flate2" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.1" @@ -885,6 +1011,20 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.5.0" @@ -1069,6 +1209,12 @@ dependencies = [ "serde_core", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.13.0" @@ -1218,6 +1364,22 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.2.0" @@ -1235,6 +1397,16 @@ version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc0287524726960e07b119cebd01678f852f147742ae0d925e6a520dca956126" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1320,6 +1492,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" @@ -1782,9 +1960,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "ring", "rustls-pki-types", @@ -1947,6 +2125,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" + [[package]] name = "slab" version = "0.4.12" @@ -2010,7 +2194,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "chrono", "crc", @@ -2087,7 +2271,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bitflags", "byteorder", "bytes", @@ -2131,7 +2315,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bitflags", "byteorder", "chrono", @@ -2206,6 +2390,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -2591,6 +2781,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.23.0" diff --git a/Cargo.toml b/Cargo.toml index da7f553..bbb49e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "eventcore-memory", "eventcore-postgres", "eventcore-sqlite", + "eventcore-stress", "eventcore-testing", "eventcore-types", ] diff --git a/eventcore-stress/Cargo.toml b/eventcore-stress/Cargo.toml new file mode 100644 index 0000000..9f2eeb7 --- /dev/null +++ b/eventcore-stress/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "eventcore-stress" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +description = "Load and stress testing tool for EventCore backends" +publish = false + +# Standalone lint config: workspace lints are too strict for stress test code. +# Binary crate compiles domain types that may appear dead from the binary's +# perspective, and stress test runners intentionally discard some results. +[lints.rust] +unsafe_code = "forbid" +dead_code = "allow" +unreachable_pub = "allow" +unused_results = "allow" + +[lints.clippy] +allow_attributes = "deny" + +[[bin]] +name = "eventcore-stress" +path = "src/main.rs" + +[dependencies] +clap = { version = "4", features = ["derive", "env"] } +eventcore = { path = "../eventcore", features = ["sqlite", "postgres"] } +eventcore-macros = { path = "../eventcore-macros" } +eventcore-memory = { path = "../eventcore-memory" } +eventcore-postgres = { path = "../eventcore-postgres" } +eventcore-sqlite = { path = "../eventcore-sqlite" } +eventcore-types = { path = "../eventcore-types" } +hdrhistogram = "7" +nutype = { version = "0.6.2", features = ["serde"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "2.0" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] } +uuid = { version = "1", features = ["v7"] } +rand = "0.9" diff --git a/eventcore-stress/src/backends.rs b/eventcore-stress/src/backends.rs new file mode 100644 index 0000000..083667c --- /dev/null +++ b/eventcore-stress/src/backends.rs @@ -0,0 +1,27 @@ +use std::env; + +use crate::config::BackendChoice; + +/// Create the postgres connection string from environment variables. +pub fn postgres_connection_string() -> String { + let port = env::var("POSTGRES_PORT").unwrap_or_else(|_| "5432".to_string()); + let host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); + let user = env::var("POSTGRES_USER").unwrap_or_else(|_| "postgres".to_string()); + let password = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgres".to_string()); + let db = env::var("POSTGRES_DB").unwrap_or_else(|_| "postgres".to_string()); + format!("postgres://{user}:{password}@{host}:{port}/{db}") +} + +/// Print which backend is being used. +pub fn print_backend_info(backend: &BackendChoice) { + match backend { + BackendChoice::Memory => println!("Using in-memory backend"), + BackendChoice::Sqlite => println!("Using SQLite in-memory backend"), + BackendChoice::Postgres => { + println!( + "Using PostgreSQL backend at {}", + postgres_connection_string() + ); + } + } +} diff --git a/eventcore-stress/src/config.rs b/eventcore-stress/src/config.rs new file mode 100644 index 0000000..35f4261 --- /dev/null +++ b/eventcore-stress/src/config.rs @@ -0,0 +1,78 @@ +use std::fmt; +use std::time::Duration; + +/// Which event store backend to stress test. +#[derive(Debug, Clone, clap::ValueEnum)] +pub enum BackendChoice { + Memory, + Sqlite, + Postgres, +} + +impl fmt::Display for BackendChoice { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BackendChoice::Memory => write!(f, "memory"), + BackendChoice::Sqlite => write!(f, "sqlite"), + BackendChoice::Postgres => write!(f, "postgres"), + } + } +} + +/// Common configuration for all stress test scenarios. +#[derive(Debug, Clone)] +pub struct StressConfig { + pub backend: BackendChoice, + pub concurrency: u32, + pub duration: Option, + pub iterations: Option, +} + +impl StressConfig { + /// Determine the effective termination condition. + /// If neither duration nor iterations is set, default to 10 seconds. + pub fn effective_duration(&self) -> Option { + if self.iterations.is_some() { + None + } else { + Some(self.duration.unwrap_or(Duration::from_secs(10))) + } + } + + pub fn effective_iterations(&self) -> Option { + self.iterations + } +} + +/// Parse a duration string like "10s", "30s", "1m", "2m30s". +pub fn parse_duration(s: &str) -> Result { + let s = s.trim(); + if s.is_empty() { + return Err("empty duration string".to_string()); + } + + // Try simple patterns first + if let Some(secs) = s.strip_suffix('s') { + let secs: u64 = secs + .trim() + .parse() + .map_err(|e| format!("invalid seconds: {e}"))?; + return Ok(Duration::from_secs(secs)); + } + + if let Some(mins) = s.strip_suffix('m') { + // Check if there's an 's' component, like "2m30s" — but we already + // stripped 's' above, so this is just "Nm" + let mins: u64 = mins + .trim() + .parse() + .map_err(|e| format!("invalid minutes: {e}"))?; + return Ok(Duration::from_secs(mins * 60)); + } + + // Try parsing as raw seconds + let secs: u64 = s + .parse() + .map_err(|_| format!("unrecognized duration format: {s}"))?; + Ok(Duration::from_secs(secs)) +} diff --git a/eventcore-stress/src/domain.rs b/eventcore-stress/src/domain.rs new file mode 100644 index 0000000..8dbe614 --- /dev/null +++ b/eventcore-stress/src/domain.rs @@ -0,0 +1,221 @@ +use eventcore::{ + Command, CommandError, CommandLogic, Event, NewEvents, RetryPolicy, StreamId, execute, +}; +use nutype::nutype; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +// ============================================================================= +// Domain Types +// ============================================================================= + +#[nutype( + validate(greater = 0), + derive(Debug, Clone, Copy, PartialEq, Eq, Into, Serialize, Deserialize) +)] +pub struct MoneyAmount(u16); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum BankAccountEvent { + MoneyDeposited { + account_id: StreamId, + amount: MoneyAmount, + }, + MoneyWithdrawn { + account_id: StreamId, + amount: MoneyAmount, + }, +} + +impl Event for BankAccountEvent { + fn stream_id(&self) -> &StreamId { + match self { + BankAccountEvent::MoneyDeposited { account_id, .. } + | BankAccountEvent::MoneyWithdrawn { account_id, .. } => account_id, + } + } + + fn event_type_name() -> &'static str { + "BankAccountEvent" + } +} + +// ============================================================================= +// State +// ============================================================================= + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct AccountBalance { + cents: u16, +} + +impl AccountBalance { + fn apply(self, event: &BankAccountEvent) -> Self { + match event { + BankAccountEvent::MoneyDeposited { amount, .. } => Self { + cents: self.cents.saturating_add((*amount).into()), + }, + BankAccountEvent::MoneyWithdrawn { amount, .. } => Self { + cents: self.cents.saturating_sub((*amount).into()), + }, + } + } +} + +// ============================================================================= +// Commands +// ============================================================================= + +/// Single-stream deposit: no state reconstruction needed. +#[derive(Clone, Command)] +pub struct Deposit { + #[stream] + pub account_id: StreamId, + pub amount: MoneyAmount, +} + +impl CommandLogic for Deposit { + type Event = BankAccountEvent; + type State = (); + + fn apply(&self, state: Self::State, _event: &Self::Event) -> Self::State { + state + } + + fn handle(&self, _state: Self::State) -> Result, CommandError> { + Ok(vec![BankAccountEvent::MoneyDeposited { + account_id: self.account_id.clone(), + amount: self.amount, + }] + .into()) + } +} + +/// Single-stream withdrawal: requires state reconstruction for balance check. +#[derive(Clone, Command)] +pub struct Withdraw { + #[stream] + pub account_id: StreamId, + pub amount: MoneyAmount, +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +enum WithdrawError { + #[error("insufficient-funds")] + InsufficientFunds, +} + +impl CommandLogic for Withdraw { + type Event = BankAccountEvent; + type State = AccountBalance; + + fn apply(&self, state: Self::State, event: &Self::Event) -> Self::State { + state.apply(event) + } + + fn handle(&self, state: Self::State) -> Result, CommandError> { + if state.cents < self.amount.into() { + return Err(CommandError::BusinessRuleViolation(Box::new( + WithdrawError::InsufficientFunds, + ))); + } + + Ok(vec![BankAccountEvent::MoneyWithdrawn { + account_id: self.account_id.clone(), + amount: self.amount, + }] + .into()) + } +} + +// ============================================================================= +// Transfer Types (Multi-Stream) +// ============================================================================= + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum TransferEvent { + Debited { + account_id: StreamId, + amount: MoneyAmount, + }, + Credited { + account_id: StreamId, + amount: MoneyAmount, + }, +} + +impl Event for TransferEvent { + fn stream_id(&self) -> &StreamId { + match self { + TransferEvent::Debited { account_id, .. } + | TransferEvent::Credited { account_id, .. } => account_id, + } + } + + fn event_type_name() -> &'static str { + "TransferEvent" + } +} + +/// Multi-stream atomic transfer across two accounts. +#[derive(Clone, Command)] +pub struct TransferMoney { + #[stream] + pub from: StreamId, + #[stream] + pub to: StreamId, + pub amount: MoneyAmount, +} + +impl CommandLogic for TransferMoney { + type Event = TransferEvent; + type State = (); + + fn apply(&self, state: Self::State, _event: &Self::Event) -> Self::State { + state + } + + fn handle(&self, _state: Self::State) -> Result, CommandError> { + Ok(vec![ + TransferEvent::Debited { + account_id: self.from.clone(), + amount: self.amount, + }, + TransferEvent::Credited { + account_id: self.to.clone(), + amount: self.amount, + }, + ] + .into()) + } +} + +// ============================================================================= +// Helpers +// ============================================================================= + +pub fn new_stream_id() -> StreamId { + StreamId::try_new(Uuid::now_v7().to_string()).expect("valid stream id") +} + +pub fn test_amount(cents: u16) -> MoneyAmount { + MoneyAmount::try_new(cents).expect("valid amount") +} + +/// Seed a stream with N deposit events using execute(). +pub async fn seed_stream( + store: &S, + account_id: &StreamId, + count: usize, +) { + let amount = test_amount(100); + for _ in 0..count { + let cmd = Deposit { + account_id: account_id.clone(), + amount, + }; + let _response = execute(store, cmd, RetryPolicy::new()) + .await + .expect("seed deposit should succeed"); + } +} diff --git a/eventcore-stress/src/main.rs b/eventcore-stress/src/main.rs new file mode 100644 index 0000000..a129d6f --- /dev/null +++ b/eventcore-stress/src/main.rs @@ -0,0 +1,273 @@ +mod backends; +mod config; +mod domain; +mod metrics; +mod runner; +mod scenarios; + +use clap::{Parser, Subcommand}; + +use crate::config::{BackendChoice, StressConfig, parse_duration}; + +#[derive(Parser)] +#[command( + name = "eventcore-stress", + about = "Stress testing tool for EventCore backends" +)] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Concurrent single-stream contention test + Contention { + /// Backend to test against + #[arg(long, default_value = "memory", value_enum)] + backend: BackendChoice, + + /// Number of concurrent tasks + #[arg(long, default_value_t = 20)] + concurrency: u32, + + /// Test duration (e.g. "10s", "30s", "1m") + #[arg(long, value_parser = parse_duration)] + duration: Option, + + /// Number of iterations (overrides duration) + #[arg(long)] + iterations: Option, + }, + + /// Concurrent multi-stream transfer test + Transfers { + /// Backend to test against + #[arg(long, default_value = "memory", value_enum)] + backend: BackendChoice, + + /// Number of concurrent tasks + #[arg(long, default_value_t = 20)] + concurrency: u32, + + /// Test duration (e.g. "10s", "30s", "1m") + #[arg(long, value_parser = parse_duration)] + duration: Option, + + /// Number of iterations (overrides duration) + #[arg(long)] + iterations: Option, + + /// Number of accounts in the transfer pool + #[arg(long, default_value_t = 10)] + accounts: u32, + }, + + /// High-throughput sequential append test (per-task streams) + Throughput { + /// Backend to test against + #[arg(long, default_value = "memory", value_enum)] + backend: BackendChoice, + + /// Number of concurrent tasks + #[arg(long, default_value_t = 20)] + concurrency: u32, + + /// Test duration (e.g. "10s", "30s", "1m") + #[arg(long, value_parser = parse_duration)] + duration: Option, + + /// Number of iterations (overrides duration) + #[arg(long)] + iterations: Option, + }, + + /// Projection catch-up after concurrent writes + Projection { + /// Backend to test against + #[arg(long, default_value = "memory", value_enum)] + backend: BackendChoice, + + /// Number of concurrent writer tasks + #[arg(long, default_value_t = 20)] + concurrency: u32, + + /// Number of events to write per task + #[arg(long)] + iterations: Option, + + /// Test duration (ignored for projection; iterations controls write volume) + #[arg(long, value_parser = parse_duration)] + duration: Option, + }, + + /// Postgres-only pool saturation test + PoolSaturation { + /// Backend to test against (must be postgres) + #[arg(long, default_value = "postgres", value_enum)] + backend: BackendChoice, + + /// Number of concurrent tasks (default 100 for saturation) + #[arg(long, default_value_t = 100)] + concurrency: u32, + + /// Test duration (e.g. "10s", "30s", "1m") + #[arg(long, value_parser = parse_duration)] + duration: Option, + + /// Number of iterations (overrides duration) + #[arg(long)] + iterations: Option, + }, + + /// Run all applicable scenarios + RunAll { + /// Backend to test against + #[arg(long, default_value = "memory", value_enum)] + backend: BackendChoice, + + /// Number of concurrent tasks + #[arg(long, default_value_t = 20)] + concurrency: u32, + + /// Test duration per scenario (e.g. "10s", "30s", "1m") + #[arg(long, value_parser = parse_duration)] + duration: Option, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cli = Cli::parse(); + + match cli.command { + Commands::Contention { + backend, + concurrency, + duration, + iterations, + } => { + let config = StressConfig { + backend: backend.clone(), + concurrency, + duration, + iterations, + }; + backends::print_backend_info(&backend); + let report = scenarios::contention::run(&config).await; + print!("{report}"); + } + + Commands::Transfers { + backend, + concurrency, + duration, + iterations, + accounts, + } => { + let config = StressConfig { + backend: backend.clone(), + concurrency, + duration, + iterations, + }; + backends::print_backend_info(&backend); + let report = scenarios::transfers::run(&config, accounts).await; + print!("{report}"); + } + + Commands::Throughput { + backend, + concurrency, + duration, + iterations, + } => { + let config = StressConfig { + backend: backend.clone(), + concurrency, + duration, + iterations, + }; + backends::print_backend_info(&backend); + let report = scenarios::throughput::run(&config).await; + print!("{report}"); + } + + Commands::Projection { + backend, + concurrency, + iterations, + duration, + } => { + let config = StressConfig { + backend: backend.clone(), + concurrency, + duration, + iterations, + }; + backends::print_backend_info(&backend); + let report = scenarios::projection::run(&config).await; + print!("{report}"); + } + + Commands::PoolSaturation { + backend, + concurrency, + duration, + iterations, + } => { + let config = StressConfig { + backend: backend.clone(), + concurrency, + duration, + iterations, + }; + backends::print_backend_info(&backend); + if let Some(report) = scenarios::pool_saturation::run(&config).await { + print!("{report}"); + } + } + + Commands::RunAll { + backend, + concurrency, + duration, + } => { + let config = StressConfig { + backend: backend.clone(), + concurrency, + duration, + iterations: None, + }; + backends::print_backend_info(&backend); + + println!("\n--- Running all scenarios ---\n"); + + let report = scenarios::contention::run(&config).await; + print!("{report}"); + + let report = scenarios::throughput::run(&config).await; + print!("{report}"); + + let report = scenarios::transfers::run(&config, 10).await; + print!("{report}"); + + let proj_config = StressConfig { + iterations: Some(100), + ..config.clone() + }; + let report = scenarios::projection::run(&proj_config).await; + print!("{report}"); + + if matches!(backend, BackendChoice::Postgres) + && let Some(report) = scenarios::pool_saturation::run(&config).await + { + print!("{report}"); + } + + println!("\n--- All scenarios complete ---"); + } + } + + Ok(()) +} diff --git a/eventcore-stress/src/metrics.rs b/eventcore-stress/src/metrics.rs new file mode 100644 index 0000000..3f2f212 --- /dev/null +++ b/eventcore-stress/src/metrics.rs @@ -0,0 +1,164 @@ +use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +use hdrhistogram::Histogram; + +/// Per-task metrics collector. Each spawned task owns one of these to avoid +/// contention on the histogram. After the task finishes, its metrics are sent +/// back via a channel for merging. +pub struct TaskMetrics { + histogram: Histogram, + successes: u64, + errors: u64, + retries: u64, +} + +impl TaskMetrics { + pub fn new() -> Self { + Self { + // 3 significant figures, max value 60 seconds in microseconds + histogram: Histogram::new_with_max(60_000_000, 3).expect("valid histogram config"), + successes: 0, + errors: 0, + retries: 0, + } + } + + pub fn record(&mut self, latency: Duration, success: bool, retries: u64) { + let micros = latency.as_micros() as u64; + let _ = self.histogram.record(micros); + if success { + self.successes += 1; + } else { + self.errors += 1; + } + self.retries += retries; + } +} + +/// Merged metrics from all tasks in a scenario run. +pub struct StressMetrics { + histogram: Histogram, + pub successes: u64, + pub errors: u64, + pub retries: u64, +} + +impl StressMetrics { + pub fn total(&self) -> u64 { + self.successes + self.errors + } + + pub fn p50(&self) -> u64 { + self.histogram.value_at_quantile(0.50) + } + + pub fn p95(&self) -> u64 { + self.histogram.value_at_quantile(0.95) + } + + pub fn p99(&self) -> u64 { + self.histogram.value_at_quantile(0.99) + } + + pub fn max(&self) -> u64 { + self.histogram.max() + } +} + +/// Merge multiple per-task metrics into a single StressMetrics. +pub fn merge_task_metrics(tasks: Vec) -> StressMetrics { + let mut merged = Histogram::::new_with_max(60_000_000, 3).expect("valid histogram config"); + let mut successes = 0u64; + let mut errors = 0u64; + let mut retries = 0u64; + + for t in tasks { + merged + .add(&t.histogram) + .expect("histogram merge should succeed"); + successes += t.successes; + errors += t.errors; + retries += t.retries; + } + + StressMetrics { + histogram: merged, + successes, + errors, + retries, + } +} + +/// Final report for a stress test scenario. +pub struct MetricsReport { + pub scenario_name: String, + pub backend: String, + pub concurrency: u32, + pub elapsed: Duration, + pub metrics: StressMetrics, + pub correctness_passed: Option, +} + +impl fmt::Display for MetricsReport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let total = self.metrics.total(); + let ops_per_sec = if self.elapsed.as_secs_f64() > 0.0 { + total as f64 / self.elapsed.as_secs_f64() + } else { + 0.0 + }; + let retry_pct = if total > 0 { + (self.metrics.retries as f64 / total as f64) * 100.0 + } else { + 0.0 + }; + + writeln!(f)?; + writeln!(f, "=== Stress Test: {} ===", self.scenario_name)?; + writeln!(f, "Backend: {}", self.backend)?; + writeln!(f, "Concurrency: {}", self.concurrency)?; + writeln!(f, "Duration: {:.2}s", self.elapsed.as_secs_f64())?; + writeln!(f, "Total ops: {total}")?; + writeln!(f, "Throughput: {ops_per_sec:.0} ops/sec")?; + writeln!(f, "Latency:")?; + writeln!(f, " p50: {} us", self.metrics.p50())?; + writeln!(f, " p95: {} us", self.metrics.p95())?; + writeln!(f, " p99: {} us", self.metrics.p99())?; + writeln!(f, " max: {} us", self.metrics.max())?; + writeln!(f, "Errors: {}", self.metrics.errors)?; + writeln!( + f, + "Retries: {} ({retry_pct:.1}% of ops)", + self.metrics.retries + )?; + + if let Some(passed) = self.correctness_passed { + writeln!(f, "Correctness: {}", if passed { "PASS" } else { "FAIL" })?; + } + + Ok(()) + } +} + +/// Shared retry counter using atomics. Clone-friendly because the inner +/// AtomicU64 is behind an Arc. +#[derive(Clone)] +pub struct RetryCounter(pub std::sync::Arc); + +impl RetryCounter { + pub fn new() -> Self { + Self(std::sync::Arc::new(AtomicU64::new(0))) + } + + pub fn count(&self) -> u64 { + self.0.load(Ordering::Relaxed) + } +} + +impl eventcore::MetricsHook for RetryCounter { + fn on_retry_attempt(&self, _ctx: &eventcore::RetryContext) { + self.0.fetch_add(1, Ordering::Relaxed); + } +} diff --git a/eventcore-stress/src/runner.rs b/eventcore-stress/src/runner.rs new file mode 100644 index 0000000..738b017 --- /dev/null +++ b/eventcore-stress/src/runner.rs @@ -0,0 +1,133 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::Instant; + +use tokio::sync::mpsc; + +use crate::config::StressConfig; +use crate::metrics::{MetricsReport, TaskMetrics, merge_task_metrics}; + +/// Result of a single stress test operation. +pub struct OperationResult { + pub success: bool, + pub retries: u64, +} + +/// Run a stress test scenario with the given configuration. +/// +/// Spawns `concurrency` tokio tasks, each running the `operation` closure in a +/// loop until the termination condition (duration or iteration count) is met. +/// +/// The `operation` closure receives the task index (0-based) and returns an +/// `OperationResult`. +pub async fn run_stress( + config: &StressConfig, + scenario_name: &str, + operation: F, +) -> MetricsReport +where + F: Fn(u32) -> Fut + Send + Sync + 'static, + Fut: Future + Send, +{ + let concurrency = config.concurrency; + let effective_duration = config.effective_duration(); + let effective_iterations = config.effective_iterations(); + + let (tx, mut rx) = mpsc::channel::(concurrency as usize); + let operation = Arc::new(operation); + let start = Instant::now(); + + // Per-task iteration count: divide total iterations evenly, give remainder + // to the first tasks. + let per_task_iters = effective_iterations.map(|total| { + let base = total / concurrency as u64; + let remainder = total % concurrency as u64; + (base, remainder) + }); + + for task_idx in 0..concurrency { + let tx = tx.clone(); + let op = Arc::clone(&operation); + let task_iters = per_task_iters.map(|(base, remainder)| { + if (task_idx as u64) < remainder { + base + 1 + } else { + base + } + }); + + tokio::spawn(async move { + let mut metrics = TaskMetrics::new(); + let mut count = 0u64; + + loop { + // Check termination condition + if let Some(max_iters) = task_iters + && count >= max_iters + { + break; + } + if let Some(dur) = effective_duration + && start.elapsed() >= dur + { + break; + } + + let op_start = Instant::now(); + let result = op(task_idx).await; + let latency = op_start.elapsed(); + + metrics.record(latency, result.success, result.retries); + count += 1; + } + + let _ = tx.send(metrics).await; + }); + } + + // Drop our sender so the receiver closes when all tasks are done. + drop(tx); + + let mut all_metrics = Vec::with_capacity(concurrency as usize); + while let Some(m) = rx.recv().await { + all_metrics.push(m); + } + + let elapsed = start.elapsed(); + let merged = merge_task_metrics(all_metrics); + + MetricsReport { + scenario_name: scenario_name.to_string(), + backend: config.backend.to_string(), + concurrency, + elapsed, + metrics: merged, + correctness_passed: None, + } +} + +/// Like `run_stress`, but allows a post-run correctness check that can set +/// the correctness_passed field on the report. +pub async fn run_stress_with_correctness( + config: &StressConfig, + scenario_name: &str, + operation: F, + correctness_check: C, +) -> MetricsReport +where + F: Fn(u32) -> Fut + Send + Sync + 'static, + Fut: Future + Send, + C: FnOnce(u64) -> CFut, + CFut: Future, +{ + let mut report = run_stress(config, scenario_name, operation).await; + let passed = correctness_check(report.metrics.successes).await; + report.correctness_passed = Some(passed); + report +} + +/// Helper to run a stress test with duration-based termination using a +/// pre-constructed `Instant` deadline. Returns the elapsed Duration. +pub fn is_past_deadline(deadline: Instant) -> bool { + Instant::now() >= deadline +} diff --git a/eventcore-stress/src/scenarios/contention.rs b/eventcore-stress/src/scenarios/contention.rs new file mode 100644 index 0000000..0665c08 --- /dev/null +++ b/eventcore-stress/src/scenarios/contention.rs @@ -0,0 +1,119 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use eventcore::{RetryPolicy, StreamId, execute}; +use eventcore_types::EventStore; + +use crate::config::{BackendChoice, StressConfig}; +use crate::domain::{Deposit, new_stream_id, test_amount}; +use crate::metrics::{MetricsReport, RetryCounter}; +use crate::runner::{OperationResult, run_stress_with_correctness}; + +/// Run the contention scenario: N tasks all deposit to a single shared stream. +pub async fn run(config: &StressConfig) -> MetricsReport { + match config.backend { + BackendChoice::Memory => { + let store = Arc::new(eventcore_memory::InMemoryEventStore::new()); + run_inner(config, store).await + } + BackendChoice::Sqlite => { + let store = Arc::new( + eventcore_sqlite::SqliteEventStore::in_memory() + .expect("failed to create SQLite store"), + ); + store.migrate().await.expect("SQLite migration failed"); + run_inner(config, store).await + } + BackendChoice::Postgres => { + let conn = crate::backends::postgres_connection_string(); + let store = Arc::new( + eventcore_postgres::PostgresEventStore::new(conn) + .await + .expect("failed to connect to PostgreSQL"), + ); + store.migrate().await; + run_inner(config, store).await + } + } +} + +async fn run_inner(config: &StressConfig, store: Arc) -> MetricsReport +where + S: EventStore + Sync + Send + 'static, +{ + let shared_stream = Arc::new(new_stream_id()); + let retry_counter = RetryCounter::new(); + let success_count = Arc::new(AtomicU64::new(0)); + + let store_ref = Arc::clone(&store); + let stream_ref = Arc::clone(&shared_stream); + let counter_ref = retry_counter.clone(); + let success_ref = Arc::clone(&success_count); + + run_stress_with_correctness( + config, + "Contention (single-stream)", + move |_task_idx| { + let store = Arc::clone(&store_ref); + let stream = Arc::clone(&stream_ref); + let counter = counter_ref.clone(); + let successes = Arc::clone(&success_ref); + async move { + let cmd = Deposit { + account_id: StreamId::clone(&stream), + amount: test_amount(1), + }; + let policy = RetryPolicy::new() + .max_retries(20) + .with_metrics_hook(counter.clone()); + let retries_before = counter.count(); + let result = execute(store.as_ref(), cmd, policy).await; + let retries_after = counter.count(); + let op_retries = retries_after.saturating_sub(retries_before); + match result { + Ok(_) => { + successes.fetch_add(1, Ordering::Relaxed); + OperationResult { + success: true, + retries: op_retries, + } + } + Err(_) => OperationResult { + success: false, + retries: op_retries, + }, + } + } + }, + { + let store = Arc::clone(&store); + let stream = Arc::clone(&shared_stream); + let successes = Arc::clone(&success_count); + move |_reported_successes| async move { + // Correctness: event count in the stream must equal successful ops + let reader = store + .read_stream::(StreamId::clone(&stream)) + .await; + match reader { + Ok(r) => { + let event_count = r.len() as u64; + let expected = successes.load(Ordering::Relaxed); + if event_count != expected { + eprintln!( + "CORRECTNESS FAILURE: expected {expected} events, found {event_count}" + ); + false + } else { + true + } + } + Err(e) => { + eprintln!("Failed to read stream for correctness check: {e}"); + false + } + } + } + }, + ) + .await +} diff --git a/eventcore-stress/src/scenarios/mod.rs b/eventcore-stress/src/scenarios/mod.rs new file mode 100644 index 0000000..0fef315 --- /dev/null +++ b/eventcore-stress/src/scenarios/mod.rs @@ -0,0 +1,5 @@ +pub mod contention; +pub mod pool_saturation; +pub mod projection; +pub mod throughput; +pub mod transfers; diff --git a/eventcore-stress/src/scenarios/pool_saturation.rs b/eventcore-stress/src/scenarios/pool_saturation.rs new file mode 100644 index 0000000..bab44dd --- /dev/null +++ b/eventcore-stress/src/scenarios/pool_saturation.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +use eventcore::{RetryPolicy, execute}; +use eventcore_postgres::{MaxConnections, PostgresConfig, PostgresEventStore}; + +use crate::config::{BackendChoice, StressConfig}; +use crate::domain::{Deposit, new_stream_id, test_amount}; +use crate::metrics::MetricsReport; +use crate::runner::{OperationResult, run_stress_with_correctness}; + +/// Run the pool saturation scenario (postgres only). +/// Creates a postgres store with a very small connection pool and runs many +/// concurrent tasks to observe pool contention and timeout behavior. +pub async fn run(config: &StressConfig) -> Option { + match config.backend { + BackendChoice::Postgres => {} + _ => { + println!("Pool saturation scenario is postgres-only. Skipping."); + return None; + } + } + + let conn = crate::backends::postgres_connection_string(); + + // Create store with a very small pool (5 connections) + let pg_config = PostgresConfig { + max_connections: MaxConnections::new(std::num::NonZeroU32::new(5).expect("5 is non-zero")), + acquire_timeout: Duration::from_secs(5), + ..PostgresConfig::default() + }; + + let store = match PostgresEventStore::with_config(&conn, pg_config).await { + Ok(s) => Arc::new(s), + Err(e) => { + eprintln!("Failed to connect to PostgreSQL: {e}"); + return None; + } + }; + + store.migrate().await; + + let total_successes = Arc::new(AtomicU64::new(0)); + let store_ref = Arc::clone(&store); + let success_ref = Arc::clone(&total_successes); + + // Override concurrency to 100 tasks against 5 connections + let saturated_config = StressConfig { + concurrency: 100, + ..config.clone() + }; + + let report = run_stress_with_correctness( + &saturated_config, + "Pool Saturation (postgres, 5 conns x 100 tasks)", + move |_task_idx| { + let store = Arc::clone(&store_ref); + let successes = Arc::clone(&success_ref); + async move { + let account_id = new_stream_id(); + let cmd = Deposit { + account_id, + amount: test_amount(1), + }; + // Use a modest retry policy since pool timeouts are expected + let policy = RetryPolicy::new().max_retries(3); + let result = execute(store.as_ref(), cmd, policy).await; + match result { + Ok(_) => { + successes.fetch_add(1, Ordering::Relaxed); + OperationResult { + success: true, + retries: 0, + } + } + Err(_) => OperationResult { + success: false, + retries: 0, + }, + } + } + }, + { + let successes = Arc::clone(&total_successes); + move |_| async move { + // Pool saturation expects some errors; just report + let count = successes.load(Ordering::Relaxed); + println!("Pool saturation: {count} successful operations out of attempted"); + // Don't fail correctness for pool saturation; just report + true + } + }, + ) + .await; + + Some(report) +} diff --git a/eventcore-stress/src/scenarios/projection.rs b/eventcore-stress/src/scenarios/projection.rs new file mode 100644 index 0000000..55d93c9 --- /dev/null +++ b/eventcore-stress/src/scenarios/projection.rs @@ -0,0 +1,217 @@ +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use eventcore::{ProjectionConfig, RetryPolicy, execute, run_projection}; +use eventcore_types::{EventStore, Projector, StreamId, StreamPosition}; + +use crate::config::{BackendChoice, StressConfig}; +use crate::domain::{BankAccountEvent, Deposit, new_stream_id, test_amount}; +use crate::metrics::MetricsReport; + +/// Balance-tracking projector for stress testing. +pub struct BalanceTrackingProjector { + balances: Arc>>, +} + +impl BalanceTrackingProjector { + fn new() -> Self { + Self { + balances: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn balances(&self) -> Arc>> { + Arc::clone(&self.balances) + } +} + +impl Projector for BalanceTrackingProjector { + type Event = BankAccountEvent; + type Error = Infallible; + type Context = (); + + fn apply( + &mut self, + event: Self::Event, + _position: StreamPosition, + _ctx: &mut Self::Context, + ) -> Result<(), Self::Error> { + let mut balances = self.balances.lock().expect("lock poisoned in projector"); + match &event { + BankAccountEvent::MoneyDeposited { + account_id, amount, .. + } => { + let amt: u16 = (*amount).into(); + *balances.entry(account_id.to_string()).or_insert(0) += amt as u64; + } + BankAccountEvent::MoneyWithdrawn { + account_id, amount, .. + } => { + let amt: u16 = (*amount).into(); + let entry = balances.entry(account_id.to_string()).or_insert(0); + *entry = entry.saturating_sub(amt as u64); + } + } + Ok(()) + } + + fn name(&self) -> &str { + "balance-tracker" + } +} + +/// Run the projection scenario: +/// Phase 1: Write events concurrently (like throughput scenario) +/// Phase 2: Run projection and measure catch-up performance +pub async fn run(config: &StressConfig) -> MetricsReport { + match config.backend { + BackendChoice::Memory => { + let store = Arc::new(eventcore_memory::InMemoryEventStore::new()); + run_inner(config, store).await + } + BackendChoice::Sqlite => { + let store = Arc::new( + eventcore_sqlite::SqliteEventStore::in_memory() + .expect("failed to create SQLite store"), + ); + store.migrate().await.expect("SQLite migration failed"); + run_inner(config, store).await + } + BackendChoice::Postgres => { + let conn = crate::backends::postgres_connection_string(); + let store = Arc::new( + eventcore_postgres::PostgresEventStore::new(conn) + .await + .expect("failed to connect to PostgreSQL"), + ); + store.migrate().await; + run_inner(config, store).await + } + } +} + +async fn run_inner(config: &StressConfig, store: Arc) -> MetricsReport +where + S: EventStore + + eventcore_types::EventReader + + eventcore_types::CheckpointStore + + eventcore_types::ProjectorCoordinator + + Sync + + Send + + 'static, + ::Error: std::fmt::Display, + ::Error: std::fmt::Debug, + ::Error: + std::fmt::Debug + std::error::Error + Send + Sync + 'static, +{ + let concurrency = config.concurrency; + + // Phase 1: Write events concurrently + println!("Phase 1: Writing events..."); + let streams: Vec = (0..concurrency).map(|_| new_stream_id()).collect(); + let streams = Arc::new(streams); + + // Use a fixed number of writes per task for the write phase. + // Batch-mode projection reads up to 1000 events in a single pass, + // so we cap total events to stay under that limit. + let max_per_task = 900 / concurrency.max(1) as u64; + let writes_per_task = config + .effective_iterations() + .map(|n| n.min(max_per_task)) + .unwrap_or(max_per_task) + .max(5); // at least 5 + + let mut handles = Vec::new(); + for task_idx in 0..concurrency { + let store = Arc::clone(&store); + let streams = Arc::clone(&streams); + handles.push(tokio::spawn(async move { + let account_id = streams[task_idx as usize].clone(); + let mut successes = 0u64; + for _ in 0..writes_per_task { + let cmd = Deposit { + account_id: account_id.clone(), + amount: test_amount(1), + }; + if execute(store.as_ref(), cmd, RetryPolicy::new()) + .await + .is_ok() + { + successes += 1; + } + } + successes + })); + } + + let mut total_events = 0u64; + for h in handles { + total_events += h.await.unwrap_or(0); + } + + println!("Phase 1 complete: {total_events} events written across {concurrency} streams"); + + // Phase 2: Run projection and measure catch-up time + println!("Phase 2: Running projection catch-up..."); + let projector = BalanceTrackingProjector::new(); + let balances = projector.balances(); + + let proj_start = Instant::now(); + let proj_result = run_projection(projector, store.as_ref(), ProjectionConfig::default()).await; + let proj_elapsed = proj_start.elapsed(); + + let correctness_passed = match proj_result { + Ok(()) => { + // Verify projection correctness + let bal = balances.lock().expect("lock poisoned"); + let mut all_correct = true; + for (stream_id_str, balance) in bal.iter() { + // Each stream should have writes_per_task deposits of 1 cent each + // (assuming all succeeded for that stream) + if *balance == 0 { + eprintln!("CORRECTNESS WARNING: stream {stream_id_str} has zero balance"); + all_correct = false; + } + } + if bal.len() != concurrency as usize { + eprintln!( + "CORRECTNESS WARNING: expected {} streams in projection, found {}", + concurrency, + bal.len() + ); + all_correct = false; + } + all_correct + } + Err(e) => { + eprintln!("Projection failed: {e}"); + false + } + }; + + let events_per_sec = if proj_elapsed.as_secs_f64() > 0.0 { + total_events as f64 / proj_elapsed.as_secs_f64() + } else { + 0.0 + }; + + println!( + "Phase 2 complete: projected {total_events} events in {:.2}s ({events_per_sec:.0} events/sec)", + proj_elapsed.as_secs_f64() + ); + + // Build a synthetic report for the projection phase + let mut metrics = crate::metrics::merge_task_metrics(vec![crate::metrics::TaskMetrics::new()]); + metrics.successes = total_events; + + MetricsReport { + scenario_name: "Projection (catch-up)".to_string(), + backend: config.backend.to_string(), + concurrency, + elapsed: proj_elapsed, + metrics, + correctness_passed: Some(correctness_passed), + } +} diff --git a/eventcore-stress/src/scenarios/throughput.rs b/eventcore-stress/src/scenarios/throughput.rs new file mode 100644 index 0000000..e2ce3f1 --- /dev/null +++ b/eventcore-stress/src/scenarios/throughput.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use eventcore::{RetryPolicy, execute}; +use eventcore_types::EventStore; + +use crate::config::{BackendChoice, StressConfig}; +use crate::domain::{Deposit, new_stream_id, test_amount}; +use crate::metrics::MetricsReport; +use crate::runner::{OperationResult, run_stress_with_correctness}; + +/// Run the throughput scenario: each task deposits to its own unique stream +/// (minimal contention, measures raw store throughput). +pub async fn run(config: &StressConfig) -> MetricsReport { + match config.backend { + BackendChoice::Memory => { + let store = Arc::new(eventcore_memory::InMemoryEventStore::new()); + run_inner(config, store).await + } + BackendChoice::Sqlite => { + let store = Arc::new( + eventcore_sqlite::SqliteEventStore::in_memory() + .expect("failed to create SQLite store"), + ); + store.migrate().await.expect("SQLite migration failed"); + run_inner(config, store).await + } + BackendChoice::Postgres => { + let conn = crate::backends::postgres_connection_string(); + let store = Arc::new( + eventcore_postgres::PostgresEventStore::new(conn) + .await + .expect("failed to connect to PostgreSQL"), + ); + store.migrate().await; + run_inner(config, store).await + } + } +} + +async fn run_inner(config: &StressConfig, store: Arc) -> MetricsReport +where + S: EventStore + Sync + Send + 'static, +{ + // Pre-create a unique stream ID per task + let concurrency = config.concurrency; + let streams: Arc> = Arc::new((0..concurrency).map(|_| new_stream_id()).collect()); + let total_successes = Arc::new(AtomicU64::new(0)); + + let store_ref = Arc::clone(&store); + let streams_ref = Arc::clone(&streams); + let success_ref = Arc::clone(&total_successes); + + run_stress_with_correctness( + config, + "Throughput (per-task streams)", + move |task_idx| { + let store = Arc::clone(&store_ref); + let streams = Arc::clone(&streams_ref); + let successes = Arc::clone(&success_ref); + async move { + let account_id = streams[task_idx as usize].clone(); + let cmd = Deposit { + account_id, + amount: test_amount(1), + }; + let result = execute(store.as_ref(), cmd, RetryPolicy::new()).await; + match result { + Ok(_) => { + successes.fetch_add(1, Ordering::Relaxed); + OperationResult { + success: true, + retries: 0, + } + } + Err(_) => OperationResult { + success: false, + retries: 0, + }, + } + } + }, + { + let successes = Arc::clone(&total_successes); + move |_| async move { + // Simple correctness: just verify we got some successes + let count = successes.load(Ordering::Relaxed); + if count == 0 { + eprintln!("CORRECTNESS FAILURE: zero successful operations"); + false + } else { + true + } + } + }, + ) + .await +} diff --git a/eventcore-stress/src/scenarios/transfers.rs b/eventcore-stress/src/scenarios/transfers.rs new file mode 100644 index 0000000..7f958dd --- /dev/null +++ b/eventcore-stress/src/scenarios/transfers.rs @@ -0,0 +1,150 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use eventcore::{RetryPolicy, StreamId, execute}; +use eventcore_types::EventStore; +use rand::Rng; + +use crate::config::{BackendChoice, StressConfig}; +use crate::domain::{TransferEvent, TransferMoney, new_stream_id, test_amount}; +use crate::metrics::{MetricsReport, RetryCounter}; +use crate::runner::{OperationResult, run_stress_with_correctness}; + +/// Run the transfers scenario: concurrent multi-stream atomic transfers +/// between a pool of accounts. +pub async fn run(config: &StressConfig, num_accounts: u32) -> MetricsReport { + match config.backend { + BackendChoice::Memory => { + let store = Arc::new(eventcore_memory::InMemoryEventStore::new()); + run_inner(config, store, num_accounts).await + } + BackendChoice::Sqlite => { + let store = Arc::new( + eventcore_sqlite::SqliteEventStore::in_memory() + .expect("failed to create SQLite store"), + ); + store.migrate().await.expect("SQLite migration failed"); + run_inner(config, store, num_accounts).await + } + BackendChoice::Postgres => { + let conn = crate::backends::postgres_connection_string(); + let store = Arc::new( + eventcore_postgres::PostgresEventStore::new(conn) + .await + .expect("failed to connect to PostgreSQL"), + ); + store.migrate().await; + run_inner(config, store, num_accounts).await + } + } +} + +async fn run_inner(config: &StressConfig, store: Arc, num_accounts: u32) -> MetricsReport +where + S: EventStore + Sync + Send + 'static, +{ + // Create N account stream IDs + let accounts: Arc> = + Arc::new((0..num_accounts).map(|_| new_stream_id()).collect()); + + let retry_counter = RetryCounter::new(); + let total_successes = Arc::new(AtomicU64::new(0)); + + let store_ref = Arc::clone(&store); + let accounts_ref = Arc::clone(&accounts); + let counter_ref = retry_counter.clone(); + let success_ref = Arc::clone(&total_successes); + + run_stress_with_correctness( + config, + "Transfers (multi-stream)", + move |_task_idx| { + let store = Arc::clone(&store_ref); + let accounts = Arc::clone(&accounts_ref); + let counter = counter_ref.clone(); + let successes = Arc::clone(&success_ref); + async move { + let (from_idx, to_idx) = { + let mut rng = rand::rng(); + let n = accounts.len(); + let from_idx = rng.random_range(0..n); + let mut to_idx = rng.random_range(0..n); + while to_idx == from_idx { + to_idx = rng.random_range(0..n); + } + (from_idx, to_idx) + }; + + let cmd = TransferMoney { + from: accounts[from_idx].clone(), + to: accounts[to_idx].clone(), + amount: test_amount(1), + }; + let policy = RetryPolicy::new() + .max_retries(20) + .with_metrics_hook(counter.clone()); + let retries_before = counter.count(); + let result = execute(store.as_ref(), cmd, policy).await; + let retries_after = counter.count(); + let op_retries = retries_after.saturating_sub(retries_before); + + match result { + Ok(_) => { + successes.fetch_add(1, Ordering::Relaxed); + OperationResult { + success: true, + retries: op_retries, + } + } + Err(_) => OperationResult { + success: false, + retries: op_retries, + }, + } + } + }, + { + let store = Arc::clone(&store); + let accounts = Arc::clone(&accounts); + move |_| async move { + // Correctness: conservation of money. + // Every successful transfer debits 1 from one account and credits 1 to + // another, so the sum across all accounts should be 0. + let mut total_balance: i64 = 0; + for account_id in accounts.iter() { + let reader = store.read_stream::(account_id.clone()).await; + match reader { + Ok(r) => { + for event in r.into_iter() { + match event { + TransferEvent::Credited { amount, .. } => { + let amt: u16 = amount.into(); + total_balance += amt as i64; + } + TransferEvent::Debited { amount, .. } => { + let amt: u16 = amount.into(); + total_balance -= amt as i64; + } + } + } + } + Err(e) => { + eprintln!("Failed to read stream {} for correctness: {e}", account_id); + return false; + } + } + } + + if total_balance != 0 { + eprintln!( + "CORRECTNESS FAILURE: money not conserved, net balance = {total_balance}" + ); + false + } else { + true + } + } + }, + ) + .await +} From 4f2dd66ffb130410ca6c28ab0bf55b7ae804171e Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 15 Apr 2026 10:25:17 -0700 Subject: [PATCH 2/4] =?UTF-8?q?chore(deps):=20upgrade=20rand=200.9?= =?UTF-8?q?=E2=86=920.10,=20mutants=200.0.3=E2=86=920.0.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bump rand to 0.10 across eventcore, eventcore-testing, and eventcore-stress. The Rng trait methods (random_range, random_bool) moved to RngExt in 0.10, so imports are updated accordingly. Bump mutants dev-dependency to 0.0.4 in eventcore-postgres (used only for #[mutants::skip] attributes). rusqlite remains at 0.32: upgrading to 0.39 is blocked by a libsqlite3-sys links conflict between rusqlite and sqlx-sqlite, which both require different major versions of the sys crate. --- Cargo.lock | 134 +++++++++++++------- eventcore-postgres/Cargo.toml | 2 +- eventcore-stress/Cargo.toml | 2 +- eventcore-stress/src/scenarios/transfers.rs | 2 +- eventcore-testing/Cargo.toml | 2 +- eventcore-testing/src/chaos.rs | 2 +- eventcore/Cargo.toml | 2 +- 7 files changed, 92 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e95a3b4..ae2d11e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,9 +174,9 @@ checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" [[package]] name = "bitflags" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" dependencies = [ "serde_core", ] @@ -222,9 +222,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.59" +version = "1.2.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283" +checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" dependencies = [ "find-msvc-tools", "shlex", @@ -236,6 +236,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.44" @@ -362,6 +373,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.4.0" @@ -580,7 +600,7 @@ dependencies = [ "eventcore-types", "nutype", "proptest", - "rand 0.9.2", + "rand 0.10.1", "serde", "serde_json", "thiserror", @@ -704,7 +724,7 @@ dependencies = [ "eventcore-types", "hdrhistogram", "nutype", - "rand 0.9.2", + "rand 0.10.1", "serde", "serde_json", "thiserror", @@ -720,7 +740,7 @@ dependencies = [ "eventcore-memory", "eventcore-types", "nutype", - "rand 0.9.2", + "rand 0.10.1", "serde", "thiserror", "tokio", @@ -946,6 +966,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -989,9 +1010,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.16.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" [[package]] name = "hashlink" @@ -1199,12 +1220,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.1" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "serde", "serde_core", ] @@ -1232,9 +1253,9 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "js-sys" -version = "0.3.94" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" dependencies = [ "once_cell", "wasm-bindgen", @@ -1278,9 +1299,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.184" +version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" [[package]] name = "libm" @@ -1290,14 +1311,14 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08" +checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" dependencies = [ "bitflags", "libc", "plain", - "redox_syscall 0.7.3", + "redox_syscall 0.7.4", ] [[package]] @@ -1393,9 +1414,9 @@ dependencies = [ [[package]] name = "mutants" -version = "0.0.3" +version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc0287524726960e07b119cebd01678f852f147742ae0d925e6a520dca956126" +checksum = "add0ac067452ff1aca8c5002111bd6b1c895baee6e45fcbc44e0193aea17be56" [[package]] name = "nom" @@ -1515,9 +1536,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.112" +version = "0.9.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57d55af3b3e226502be1526dfdba67ab0e9c96fc293004e79576b2b9edb0dbdb" +checksum = "ad2f2c0eba47118757e4c6d2bff2838f3e0523380021356e7875e858372ce644" dependencies = [ "cc", "libc", @@ -1609,9 +1630,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.32" +version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" [[package]] name = "plain" @@ -1694,7 +1715,7 @@ dependencies = [ "bit-vec", "bitflags", "num-traits", - "rand 0.9.2", + "rand 0.9.4", "rand_chacha 0.9.0", "rand_xorshift", "regex-syntax", @@ -1743,14 +1764,25 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.2" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.5", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -1789,6 +1821,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_xorshift" version = "0.4.0" @@ -1800,9 +1838,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" dependencies = [ "either", "rayon-core", @@ -1829,9 +1867,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" +checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a" dependencies = [ "bitflags", ] @@ -1937,9 +1975,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.37" +version = "0.23.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" +checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" dependencies = [ "once_cell", "ring", @@ -2085,7 +2123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -2096,7 +2134,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -2518,9 +2556,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.51.1" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" +checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776" dependencies = [ "bytes", "libc", @@ -2869,9 +2907,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" dependencies = [ "cfg-if", "once_cell", @@ -2882,9 +2920,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2892,9 +2930,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" dependencies = [ "bumpalo", "proc-macro2", @@ -2905,9 +2943,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" dependencies = [ "unicode-ident", ] @@ -2948,9 +2986,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.94" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd70027e39b12f0849461e08ffc50b9cd7688d942c1c8e3c7b22273236b4dd0a" +checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/eventcore-postgres/Cargo.toml b/eventcore-postgres/Cargo.toml index 998f770..c29994b 100644 --- a/eventcore-postgres/Cargo.toml +++ b/eventcore-postgres/Cargo.toml @@ -22,7 +22,7 @@ uuid = { version = "1.19.0", features = ["serde", "v7"] } [dev-dependencies] eventcore-testing = { path = "../eventcore-testing" } futures = "0.3" -mutants = "0.0.3" +mutants = "0.0.4" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.48.0", default-features = false, features = ["macros", "rt-multi-thread"] } tracing-test = { version = "0.2", features = ["no-env-filter"] } diff --git a/eventcore-stress/Cargo.toml b/eventcore-stress/Cargo.toml index 9f2eeb7..9e84f8b 100644 --- a/eventcore-stress/Cargo.toml +++ b/eventcore-stress/Cargo.toml @@ -38,4 +38,4 @@ serde_json = "1.0" thiserror = "2.0" tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] } uuid = { version = "1", features = ["v7"] } -rand = "0.9" +rand = "0.10" diff --git a/eventcore-stress/src/scenarios/transfers.rs b/eventcore-stress/src/scenarios/transfers.rs index 7f958dd..d6646cb 100644 --- a/eventcore-stress/src/scenarios/transfers.rs +++ b/eventcore-stress/src/scenarios/transfers.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use eventcore::{RetryPolicy, StreamId, execute}; use eventcore_types::EventStore; -use rand::Rng; +use rand::RngExt; use crate::config::{BackendChoice, StressConfig}; use crate::domain::{TransferEvent, TransferMoney, new_stream_id, test_amount}; diff --git a/eventcore-testing/Cargo.toml b/eventcore-testing/Cargo.toml index e740cd5..cc141d2 100644 --- a/eventcore-testing/Cargo.toml +++ b/eventcore-testing/Cargo.toml @@ -14,7 +14,7 @@ eventcore = { version = "0.7.0", path = "../eventcore" } eventcore-memory = { version = "0.7.0", path = "../eventcore-memory" } eventcore-types = { version = "0.7.0", path = "../eventcore-types" } nutype = { version = "0.6.2", features = ["serde"] } -rand = "0.9.2" +rand = "0.10" serde = { version = "1.0", features = ["derive"] } thiserror = "2.0.18" uuid = { version = "1.19.0", features = ["v7"] } diff --git a/eventcore-testing/src/chaos.rs b/eventcore-testing/src/chaos.rs index ded9df8..87a934f 100644 --- a/eventcore-testing/src/chaos.rs +++ b/eventcore-testing/src/chaos.rs @@ -5,7 +5,7 @@ use eventcore_types::{ StreamVersion, StreamWrites, }; use nutype::nutype; -use rand::{Rng, SeedableRng, random, rngs::StdRng}; +use rand::{RngExt, SeedableRng, random, rngs::StdRng}; /// Probability of injecting read/write failures for chaos testing. /// diff --git a/eventcore/Cargo.toml b/eventcore/Cargo.toml index 32fd33a..3300fe3 100644 --- a/eventcore/Cargo.toml +++ b/eventcore/Cargo.toml @@ -21,7 +21,7 @@ eventcore-types = { version = "0.7.0", path = "../eventcore-types" } # Runtime dependencies for execute() and retry logic chrono = { version = "0.4.42", features = ["serde"] } -rand = "0.9.2" +rand = "0.10" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0" thiserror = "2.0.17" From f4f1a62d038c2f70769d02ee28a36f5cc0906048 Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 15 Apr 2026 11:32:17 -0700 Subject: [PATCH 3/4] fix(stress): truncate postgres tables before stress test runs The projection stress test scenario reported Correctness: FAIL on PostgreSQL because the database contained stale events from contract tests and previous runs. The first batch of events (LIMIT 1000) was entirely non-stress-test events that failed deserialization silently. Add clean_postgres_database() that truncates eventcore_events and eventcore_subscription_versions before each stress test run when using the postgres backend. TRUNCATE is used instead of DELETE to bypass the row-level delete-prevention trigger. Closes #371 --- Cargo.lock | 1 + eventcore-stress/Cargo.toml | 1 + eventcore-stress/src/backends.rs | 22 ++++++++++++++++++++++ eventcore-stress/src/main.rs | 19 +++++++++++++++++++ 4 files changed, 43 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index ae2d11e..cee9393 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,6 +727,7 @@ dependencies = [ "rand 0.10.1", "serde", "serde_json", + "sqlx", "thiserror", "tokio", "uuid", diff --git a/eventcore-stress/Cargo.toml b/eventcore-stress/Cargo.toml index 9e84f8b..0b5f6b9 100644 --- a/eventcore-stress/Cargo.toml +++ b/eventcore-stress/Cargo.toml @@ -39,3 +39,4 @@ thiserror = "2.0" tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] } uuid = { version = "1", features = ["v7"] } rand = "0.10" +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres"] } diff --git a/eventcore-stress/src/backends.rs b/eventcore-stress/src/backends.rs index 083667c..9a27d45 100644 --- a/eventcore-stress/src/backends.rs +++ b/eventcore-stress/src/backends.rs @@ -1,5 +1,7 @@ use std::env; +use sqlx::postgres::PgPoolOptions; + use crate::config::BackendChoice; /// Create the postgres connection string from environment variables. @@ -12,6 +14,26 @@ pub fn postgres_connection_string() -> String { format!("postgres://{user}:{password}@{host}:{port}/{db}") } +/// Truncate all eventcore tables in the PostgreSQL database. +/// +/// This removes stale data from contract tests and previous stress test runs +/// so each stress test starts from a clean state. TRUNCATE bypasses the +/// row-level delete-prevention trigger on `eventcore_events`. +pub async fn clean_postgres_database() -> Result<(), Box> { + let conn = postgres_connection_string(); + let pool = PgPoolOptions::new() + .max_connections(1) + .connect(&conn) + .await?; + + sqlx::query("TRUNCATE TABLE eventcore_events, eventcore_subscription_versions") + .execute(&pool) + .await?; + + println!("Cleaned PostgreSQL database (truncated eventcore tables)"); + Ok(()) +} + /// Print which backend is being used. pub fn print_backend_info(backend: &BackendChoice) { match backend { diff --git a/eventcore-stress/src/main.rs b/eventcore-stress/src/main.rs index a129d6f..8620f1d 100644 --- a/eventcore-stress/src/main.rs +++ b/eventcore-stress/src/main.rs @@ -154,6 +154,9 @@ async fn main() -> Result<(), Box> { iterations, }; backends::print_backend_info(&backend); + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } let report = scenarios::contention::run(&config).await; print!("{report}"); } @@ -172,6 +175,9 @@ async fn main() -> Result<(), Box> { iterations, }; backends::print_backend_info(&backend); + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } let report = scenarios::transfers::run(&config, accounts).await; print!("{report}"); } @@ -189,6 +195,9 @@ async fn main() -> Result<(), Box> { iterations, }; backends::print_backend_info(&backend); + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } let report = scenarios::throughput::run(&config).await; print!("{report}"); } @@ -206,6 +215,9 @@ async fn main() -> Result<(), Box> { iterations, }; backends::print_backend_info(&backend); + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } let report = scenarios::projection::run(&config).await; print!("{report}"); } @@ -223,6 +235,9 @@ async fn main() -> Result<(), Box> { iterations, }; backends::print_backend_info(&backend); + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } if let Some(report) = scenarios::pool_saturation::run(&config).await { print!("{report}"); } @@ -241,6 +256,10 @@ async fn main() -> Result<(), Box> { }; backends::print_backend_info(&backend); + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } + println!("\n--- Running all scenarios ---\n"); let report = scenarios::contention::run(&config).await; From 0f286543708dbcc853643957d2991750800416fc Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 15 Apr 2026 11:48:43 -0700 Subject: [PATCH 4/4] fix(stress): truncate postgres before projection in run-all The projection scenario needs a clean database for its correctness check (expects exactly N streams). Previous scenarios in run-all (contention, throughput, transfers) fill the table with unrelated BankAccountEvents. Batch-mode projection reads only the first 1000 events which were all from the contention scenario, finding 1 stream instead of 50. Add a truncation step before the projection scenario within run-all so it starts from a clean slate, matching the behavior of in-memory and SQLite backends which create fresh stores per scenario. --- eventcore-stress/src/main.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eventcore-stress/src/main.rs b/eventcore-stress/src/main.rs index 8620f1d..7f35a85 100644 --- a/eventcore-stress/src/main.rs +++ b/eventcore-stress/src/main.rs @@ -271,6 +271,11 @@ async fn main() -> Result<(), Box> { let report = scenarios::transfers::run(&config, 10).await; print!("{report}"); + // Projection needs a clean store to validate correctness + // (previous scenarios pollute the database with unrelated events) + if matches!(backend, BackendChoice::Postgres) { + backends::clean_postgres_database().await?; + } let proj_config = StressConfig { iterations: Some(100), ..config.clone()