diff --git a/Cargo.lock b/Cargo.lock index b1b2905..d8c6f86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -78,9 +90,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "bitflags" @@ -126,9 +138,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.48" +version = "1.2.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" +checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" dependencies = [ "find-msvc-tools", "shlex", @@ -389,13 +401,21 @@ dependencies = [ "criterion", "futures", "hostname", + "metrics", + "metrics-util", + "opentelemetry", + "opentelemetry_sdk", + "ordered-float", "rand 0.9.2", "serde", "serde_json", "sqlx", - "thiserror", + "thiserror 2.0.17", "tokio", "tracing", + "tracing-fluent-assertions", + "tracing-opentelemetry", + "tracing-subscriber", "uuid", ] @@ -408,6 +428,12 @@ dependencies = [ "serde", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "equivalent" version = "1.0.2" @@ -607,6 +633,12 @@ dependencies = [ "wasip2", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "half" version = "2.7.1" @@ -775,9 +807,9 @@ checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" dependencies = [ "icu_collections", "icu_locale_core", @@ -789,9 +821,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" [[package]] name = "icu_provider" @@ -886,9 +918,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.177" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libm" @@ -934,9 +966,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "md-5" @@ -954,17 +986,63 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-util" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b482df36c13dd1869d73d14d28cd4855fbd6cfc32294bee109908a9f4a4ed7" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.5", + "indexmap", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "sketches-ddsketch", +] + [[package]] name = "mio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "wasi", "windows-sys 0.61.2", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-bigint-dig" version = "0.8.6" @@ -1023,6 +1101,48 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 1.0.69", + "tracing", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 1.0.69", + "tracing", +] + +[[package]] +name = "ordered-float" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" +dependencies = [ + "num-traits", +] + [[package]] name = "parking" version = "2.2.1" @@ -1134,6 +1254,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + [[package]] name = "potential_utf" version = "0.1.4" @@ -1161,6 +1287,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.42" @@ -1176,6 +1317,16 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -1235,6 +1386,15 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.11.0" @@ -1474,6 +1634,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1499,6 +1668,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.11" @@ -1585,7 +1760,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-stream", "toml", @@ -1629,7 +1804,7 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "syn", - "thiserror", + "thiserror 2.0.17", "tokio", "url", ] @@ -1671,7 +1846,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.17", "tracing", "uuid", "whoami", @@ -1709,7 +1884,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.17", "tracing", "uuid", "whoami", @@ -1735,7 +1910,7 @@ dependencies = [ "serde", "serde_urlencoded", "sqlx-core", - "thiserror", + "thiserror 2.0.17", "tracing", "url", "uuid", @@ -1786,13 +1961,33 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.17", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1806,6 +2001,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -1951,6 +2155,61 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-fluent-assertions" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12de1a8c6bcfee614305e836308b596bbac831137a04c61f7e5b0b0bf2cfeaf6" +dependencies = [ + "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -2022,6 +2281,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2120,6 +2385,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.11" @@ -2148,6 +2423,22 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.11" @@ -2157,6 +2448,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.62.2" diff --git a/Cargo.toml b/Cargo.toml index f3a020c..ab85690 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,27 @@ hostname = "0.4" rand = "0.9" futures = "0.3.31" +# Optional telemetry dependencies +tracing-opentelemetry = { version = "0.28", optional = true } +opentelemetry = { version = "0.27", optional = true } +opentelemetry_sdk = { version = "0.27", optional = true } +metrics = { version = "0.24", optional = true } + +[features] +default = [] +telemetry = [ + "dep:tracing-opentelemetry", + "dep:opentelemetry", + "dep:opentelemetry_sdk", + "dep:metrics", +] + [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } +tracing-fluent-assertions = "0.3" +metrics-util = { version = "0.18", features = ["debugging"] } +tracing-subscriber = { version = "0.3", features = ["registry"] } +ordered-float = "4" [[bench]] name = "throughput" diff --git a/src/client.rs b/src/client.rs index 7f8b4ed..f04de51 100644 --- a/src/client.rs +++ b/src/client.rs @@ -48,6 +48,21 @@ impl CancellationPolicyDb { use crate::worker::Worker; +/// Validates that user-provided headers don't use reserved prefixes. +fn validate_headers(headers: &Option>) -> anyhow::Result<()> { + if let Some(headers) = headers { + for key in headers.keys() { + if key.starts_with("durable::") { + anyhow::bail!( + "Header key '{}' uses reserved prefix 'durable::'. User headers cannot start with 'durable::'.", + key + ); + } + } + } + Ok(()) +} + /// The main client for interacting with durable workflows. /// /// Use this client to: @@ -355,6 +370,14 @@ where /// Spawn a task by name using a custom executor. /// /// The task must be registered before spawning. + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.client.spawn", + skip(self, executor, params, options), + fields(queue, task_name = %task_name) + ) + )] pub async fn spawn_by_name_with<'e, E>( &self, executor: E, @@ -380,16 +403,30 @@ where } /// Internal spawn implementation without registry validation. + #[allow(unused_mut)] // mut is needed when telemetry feature is enabled async fn spawn_by_name_internal<'e, E>( &self, executor: E, task_name: &str, params: JsonValue, - options: SpawnOptions, + mut options: SpawnOptions, ) -> anyhow::Result where E: Executor<'e, Database = Postgres>, { + // Validate user headers don't use reserved prefix + validate_headers(&options.headers)?; + + // Inject trace context into headers for distributed tracing + #[cfg(feature = "telemetry")] + { + let headers = options.headers.get_or_insert_with(HashMap::new); + crate::telemetry::inject_trace_context(headers); + } + + #[cfg(feature = "telemetry")] + tracing::Span::current().record("queue", &self.queue_name); + let max_attempts = options.max_attempts.unwrap_or(self.default_max_attempts); let db_options = Self::serialize_spawn_options(&options, max_attempts)?; @@ -405,6 +442,9 @@ where .fetch_one(executor) .await?; + #[cfg(feature = "telemetry")] + crate::telemetry::record_task_spawned(&self.queue_name, task_name); + Ok(SpawnResult { task_id: row.task_id, run_id: row.run_id, @@ -452,6 +492,14 @@ where } /// Emit an event to a queue (defaults to this client's queue) + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.client.emit_event", + skip(self, payload), + fields(queue, event_name = %event_name) + ) + )] pub async fn emit_event( &self, event_name: &str, @@ -461,6 +509,10 @@ where anyhow::ensure!(!event_name.is_empty(), "event_name must be non-empty"); let queue = queue_name.unwrap_or(&self.queue_name); + + #[cfg(feature = "telemetry")] + tracing::Span::current().record("queue", queue); + let payload_json = serde_json::to_value(payload)?; let query = "SELECT durable.emit_event($1, $2, $3)"; @@ -471,6 +523,9 @@ where .execute(&self.pool) .await?; + #[cfg(feature = "telemetry")] + crate::telemetry::record_event_emitted(queue, event_name); + Ok(()) } diff --git a/src/context.rs b/src/context.rs index d81acc9..335d45a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -156,6 +156,14 @@ where /// stripe::charge(amount, &idempotency_key).await /// }).await?; /// ``` + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.step", + skip(self, f), + fields(task_id = %self.task_id, step_name = %name) + ) + )] pub async fn step(&mut self, name: &str, f: F) -> TaskResult where T: Serialize + DeserializeOwned + Send, @@ -174,8 +182,21 @@ where let result = f().await?; // Persist checkpoint (also extends claim lease) + #[cfg(feature = "telemetry")] + let checkpoint_start = std::time::Instant::now(); + self.persist_checkpoint(&checkpoint_name, &result).await?; + #[cfg(feature = "telemetry")] + { + let duration = checkpoint_start.elapsed().as_secs_f64(); + crate::telemetry::record_checkpoint_duration( + &self.queue_name, + &self.task.task_name, + duration, + ); + } + Ok(result) } @@ -225,6 +246,14 @@ where /// /// Wake time is computed using the database clock to ensure consistency /// with the scheduler and enable deterministic testing via `durable.fake_now`. + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.sleep_for", + skip(self), + fields(task_id = %self.task_id, duration_ms = duration.as_millis() as u64) + ) + )] pub async fn sleep_for(&mut self, name: &str, duration: std::time::Duration) -> TaskResult<()> { validate_user_name(name)?; let checkpoint_name = self.get_checkpoint_name(name); @@ -268,6 +297,14 @@ where /// Some(Duration::from_secs(7 * 24 * 3600)), /// ).await?; /// ``` + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.await_event", + skip(self, timeout), + fields(task_id = %self.task_id, event_name = %event_name) + ) + )] pub async fn await_event( &mut self, event_name: &str, @@ -323,6 +360,14 @@ where /// updates the payload (last write wins). Tasks waiting for this event /// are woken with the payload at the time of the write that woke them; /// subsequent writes do not propagate to already-woken tasks. + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.emit_event", + skip(self, payload), + fields(task_id = %self.task_id, event_name = %event_name) + ) + )] pub async fn emit_event(&self, event_name: &str, payload: &T) -> TaskResult<()> { if event_name.is_empty() { return Err(TaskError::Failed(anyhow::anyhow!( @@ -352,6 +397,14 @@ where /// /// # Errors /// Returns `TaskError::Control(Cancelled)` if the task was cancelled. + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.heartbeat", + skip(self), + fields(task_id = %self.task_id) + ) + )] pub async fn heartbeat(&self, duration: Option) -> TaskResult<()> { let extend_by = duration .map(|d| d.as_secs() as i32) @@ -454,6 +507,14 @@ where /// let r1: ItemResult = ctx.join("item-1", h1).await?; /// let r2: ItemResult = ctx.join("item-2", h2).await?; /// ``` + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.spawn", + skip(self, params, options), + fields(task_id = %self.task_id, subtask_name = T::NAME) + ) + )] pub async fn spawn( &mut self, name: &str, @@ -515,6 +576,19 @@ where options: SpawnOptions, ) -> TaskResult> { validate_user_name(name)?; + + // Validate headers don't use reserved prefix + if let Some(ref headers) = options.headers { + for key in headers.keys() { + if key.starts_with("durable::") { + return Err(TaskError::Failed(anyhow::anyhow!( + "Header key '{}' uses reserved prefix 'durable::'. User headers cannot start with 'durable::'.", + key + ))); + } + } + } + let checkpoint_name = self.get_checkpoint_name(&format!("$spawn:{name}")); // Return cached task_id if already spawned @@ -574,8 +648,7 @@ where /// /// # Arguments /// - /// * `name` - Unique name for this join operation (used for checkpointing). - /// Uniqueness is constrained just within this task, not globally or for child tasks. + /// * `name` - Unique name for this join operation (used for checkpointing) /// * `handle` - The [`TaskHandle`] returned by [`spawn`](Self::spawn) /// /// # Errors @@ -590,6 +663,14 @@ where /// // ... do other work ... /// let result: ComputeResult = ctx.join("compute", handle).await?; /// ``` + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.task.join", + skip(self, handle), + fields(task_id = %self.task_id, child_task_id = %handle.task_id) + ) + )] pub async fn join( &mut self, name: &str, diff --git a/src/lib.rs b/src/lib.rs index c7190db..2b101de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,8 @@ mod client; mod context; mod error; mod task; +#[cfg(feature = "telemetry")] +pub mod telemetry; mod types; mod worker; diff --git a/src/telemetry/metrics.rs b/src/telemetry/metrics.rs new file mode 100644 index 0000000..32763ea --- /dev/null +++ b/src/telemetry/metrics.rs @@ -0,0 +1,358 @@ +//! Metrics definitions and recording helpers for the durable execution system. +//! +//! All metrics are prefixed with `durable_` and use Prometheus naming conventions. + +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; + +// Metric name constants +pub const TASKS_SPAWNED_TOTAL: &str = "durable_tasks_spawned_total"; +pub const TASKS_CLAIMED_TOTAL: &str = "durable_tasks_claimed_total"; +pub const TASKS_COMPLETED_TOTAL: &str = "durable_tasks_completed_total"; +pub const TASKS_FAILED_TOTAL: &str = "durable_tasks_failed_total"; +pub const EVENTS_EMITTED_TOTAL: &str = "durable_events_emitted_total"; + +pub const WORKER_CONCURRENT_TASKS: &str = "durable_worker_concurrent_tasks"; +pub const WORKER_ACTIVE: &str = "durable_worker_active"; + +pub const TASK_EXECUTION_DURATION: &str = "durable_task_execution_duration_seconds"; +pub const TASK_CLAIM_DURATION: &str = "durable_task_claim_duration_seconds"; +pub const CHECKPOINT_DURATION: &str = "durable_checkpoint_duration_seconds"; + +/// Register all metric descriptions. Called once during telemetry initialization. +pub fn register_metrics() { + // Counters + describe_counter!(TASKS_SPAWNED_TOTAL, "Total number of tasks spawned"); + describe_counter!( + TASKS_CLAIMED_TOTAL, + "Total number of tasks claimed by workers" + ); + describe_counter!( + TASKS_COMPLETED_TOTAL, + "Total number of tasks that completed successfully" + ); + describe_counter!(TASKS_FAILED_TOTAL, "Total number of tasks that failed"); + describe_counter!(EVENTS_EMITTED_TOTAL, "Total number of events emitted"); + + // Gauges + describe_gauge!( + WORKER_CONCURRENT_TASKS, + "Number of tasks currently being executed by this worker" + ); + describe_gauge!( + WORKER_ACTIVE, + "Whether the worker is active (1) or shut down (0)" + ); + + // Histograms + describe_histogram!( + TASK_EXECUTION_DURATION, + "Duration of task execution in seconds" + ); + describe_histogram!( + TASK_CLAIM_DURATION, + "Duration of task claim operation in seconds" + ); + describe_histogram!( + CHECKPOINT_DURATION, + "Duration of checkpoint persistence in seconds" + ); +} + +// Helper functions for recording metrics + +/// Record a task spawn event +pub fn record_task_spawned(queue: &str, task_name: &str) { + counter!(TASKS_SPAWNED_TOTAL, "queue" => queue.to_string(), "task_name" => task_name.to_string()) + .increment(1); +} + +/// Record a task claim event +pub fn record_task_claimed(queue: &str) { + counter!(TASKS_CLAIMED_TOTAL, "queue" => queue.to_string()).increment(1); +} + +/// Record a successful task completion +pub fn record_task_completed(queue: &str, task_name: &str) { + counter!(TASKS_COMPLETED_TOTAL, "queue" => queue.to_string(), "task_name" => task_name.to_string()) + .increment(1); +} + +/// Record a task failure +pub fn record_task_failed(queue: &str, task_name: &str, error_type: &str) { + counter!(TASKS_FAILED_TOTAL, "queue" => queue.to_string(), "task_name" => task_name.to_string(), "error_type" => error_type.to_string()) + .increment(1); +} + +/// Record an event emission +pub fn record_event_emitted(queue: &str, event_name: &str) { + counter!(EVENTS_EMITTED_TOTAL, "queue" => queue.to_string(), "event_name" => event_name.to_string()) + .increment(1); +} + +/// Set the current number of concurrent tasks for a worker +pub fn set_worker_concurrent_tasks(queue: &str, worker_id: &str, count: usize) { + gauge!(WORKER_CONCURRENT_TASKS, "queue" => queue.to_string(), "worker_id" => worker_id.to_string()) + .set(count as f64); +} + +/// Set whether a worker is active +pub fn set_worker_active(queue: &str, worker_id: &str, active: bool) { + gauge!(WORKER_ACTIVE, "queue" => queue.to_string(), "worker_id" => worker_id.to_string()) + .set(if active { 1.0 } else { 0.0 }); +} + +/// Record task execution duration +pub fn record_task_execution_duration( + queue: &str, + task_name: &str, + outcome: &str, + duration_secs: f64, +) { + histogram!(TASK_EXECUTION_DURATION, "queue" => queue.to_string(), "task_name" => task_name.to_string(), "outcome" => outcome.to_string()) + .record(duration_secs); +} + +/// Record task claim duration +pub fn record_task_claim_duration(queue: &str, duration_secs: f64) { + histogram!(TASK_CLAIM_DURATION, "queue" => queue.to_string()).record(duration_secs); +} + +/// Record checkpoint persistence duration +pub fn record_checkpoint_duration(queue: &str, task_name: &str, duration_secs: f64) { + histogram!(CHECKPOINT_DURATION, "queue" => queue.to_string(), "task_name" => task_name.to_string()) + .record(duration_secs); +} + +#[cfg(test)] +mod tests { + #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] + + use super::*; + use metrics::with_local_recorder; + use metrics_util::CompositeKey; + use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshot}; + use ordered_float::OrderedFloat; + + fn find_counter(snapshot: Snapshot, name: &str) -> Option<(CompositeKey, u64)> { + snapshot + .into_vec() + .into_iter() + .find(|(key, _, _, _)| key.key().name() == name) + .map(|(key, _, _, value)| { + let count = match value { + DebugValue::Counter(c) => c, + _ => panic!("Expected counter"), + }; + (key, count) + }) + } + + fn find_gauge(snapshot: Snapshot, name: &str) -> Option<(CompositeKey, f64)> { + snapshot + .into_vec() + .into_iter() + .find(|(key, _, _, _)| key.key().name() == name) + .map(|(key, _, _, value)| { + let gauge_value = match value { + DebugValue::Gauge(g) => g.0, + _ => panic!("Expected gauge"), + }; + (key, gauge_value) + }) + } + + fn find_histogram( + snapshot: Snapshot, + name: &str, + ) -> Option<(CompositeKey, Vec>)> { + snapshot + .into_vec() + .into_iter() + .find(|(key, _, _, _)| key.key().name() == name) + .map(|(key, _, _, value)| { + let values = match value { + DebugValue::Histogram(h) => h, + _ => panic!("Expected histogram"), + }; + (key, values) + }) + } + + fn get_label<'a>(key: &'a CompositeKey, label_name: &str) -> Option<&'a str> { + key.key() + .labels() + .find(|l| l.key() == label_name) + .map(|l| l.value()) + } + + #[test] + fn test_record_task_spawned() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_task_spawned("test_queue", "MyTask"); + }); + + let snapshot = snapshotter.snapshot(); + let (key, count) = find_counter(snapshot, TASKS_SPAWNED_TOTAL).unwrap(); + assert_eq!(count, 1); + assert_eq!(get_label(&key, "queue"), Some("test_queue")); + assert_eq!(get_label(&key, "task_name"), Some("MyTask")); + } + + #[test] + fn test_record_task_claimed() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_task_claimed("claim_queue"); + }); + + let snapshot = snapshotter.snapshot(); + let (key, count) = find_counter(snapshot, TASKS_CLAIMED_TOTAL).unwrap(); + assert_eq!(count, 1); + assert_eq!(get_label(&key, "queue"), Some("claim_queue")); + } + + #[test] + fn test_record_task_completed() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_task_completed("complete_queue", "CompletedTask"); + }); + + let snapshot = snapshotter.snapshot(); + let (key, count) = find_counter(snapshot, TASKS_COMPLETED_TOTAL).unwrap(); + assert_eq!(count, 1); + assert_eq!(get_label(&key, "queue"), Some("complete_queue")); + assert_eq!(get_label(&key, "task_name"), Some("CompletedTask")); + } + + #[test] + fn test_record_task_failed() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_task_failed("fail_queue", "FailedTask", "timeout"); + }); + + let snapshot = snapshotter.snapshot(); + let (key, count) = find_counter(snapshot, TASKS_FAILED_TOTAL).unwrap(); + assert_eq!(count, 1); + assert_eq!(get_label(&key, "queue"), Some("fail_queue")); + assert_eq!(get_label(&key, "task_name"), Some("FailedTask")); + assert_eq!(get_label(&key, "error_type"), Some("timeout")); + } + + #[test] + fn test_record_event_emitted() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_event_emitted("event_queue", "user_created"); + }); + + let snapshot = snapshotter.snapshot(); + let (key, count) = find_counter(snapshot, EVENTS_EMITTED_TOTAL).unwrap(); + assert_eq!(count, 1); + assert_eq!(get_label(&key, "queue"), Some("event_queue")); + assert_eq!(get_label(&key, "event_name"), Some("user_created")); + } + + #[test] + fn test_set_worker_concurrent_tasks() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + set_worker_concurrent_tasks("worker_queue", "worker-1", 5); + }); + + let snapshot = snapshotter.snapshot(); + let (key, value) = find_gauge(snapshot, WORKER_CONCURRENT_TASKS).unwrap(); + assert!((value - 5.0).abs() < f64::EPSILON); + assert_eq!(get_label(&key, "queue"), Some("worker_queue")); + assert_eq!(get_label(&key, "worker_id"), Some("worker-1")); + } + + #[test] + fn test_set_worker_active() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + set_worker_active("active_queue", "worker-2", true); + }); + + let snapshot = snapshotter.snapshot(); + let (key, value) = find_gauge(snapshot, WORKER_ACTIVE).unwrap(); + assert!((value - 1.0).abs() < f64::EPSILON); + assert_eq!(get_label(&key, "queue"), Some("active_queue")); + assert_eq!(get_label(&key, "worker_id"), Some("worker-2")); + + with_local_recorder(&recorder, || { + set_worker_active("active_queue", "worker-2", false); + }); + + let snapshot = snapshotter.snapshot(); + let (_, value) = find_gauge(snapshot, WORKER_ACTIVE).unwrap(); + assert!(value.abs() < f64::EPSILON); + } + + #[test] + fn test_record_task_execution_duration() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_task_execution_duration("exec_queue", "ExecTask", "completed", 1.5); + }); + + let snapshot = snapshotter.snapshot(); + let (key, values) = find_histogram(snapshot, TASK_EXECUTION_DURATION).unwrap(); + assert_eq!(values.len(), 1); + assert_eq!(values[0], OrderedFloat(1.5)); + assert_eq!(get_label(&key, "queue"), Some("exec_queue")); + assert_eq!(get_label(&key, "task_name"), Some("ExecTask")); + assert_eq!(get_label(&key, "outcome"), Some("completed")); + } + + #[test] + fn test_record_task_claim_duration() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_task_claim_duration("claim_dur_queue", 0.25); + }); + + let snapshot = snapshotter.snapshot(); + let (key, values) = find_histogram(snapshot, TASK_CLAIM_DURATION).unwrap(); + assert_eq!(values.len(), 1); + assert_eq!(values[0], OrderedFloat(0.25)); + assert_eq!(get_label(&key, "queue"), Some("claim_dur_queue")); + } + + #[test] + fn test_record_checkpoint_duration() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + with_local_recorder(&recorder, || { + record_checkpoint_duration("ckpt_queue", "CkptTask", 0.1); + }); + + let snapshot = snapshotter.snapshot(); + let (key, values) = find_histogram(snapshot, CHECKPOINT_DURATION).unwrap(); + assert_eq!(values.len(), 1); + assert_eq!(values[0], OrderedFloat(0.1)); + assert_eq!(get_label(&key, "queue"), Some("ckpt_queue")); + assert_eq!(get_label(&key, "task_name"), Some("CkptTask")); + } +} diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs new file mode 100644 index 0000000..979053c --- /dev/null +++ b/src/telemetry/mod.rs @@ -0,0 +1,24 @@ +//! Observability helpers for the durable execution system. +//! +//! This module provides: +//! - Metric recording helpers (backend-agnostic via the `metrics` crate) +//! - W3C Trace Context propagation across task boundaries +//! +//! # Feature Flag +//! +//! Enable with the `telemetry` feature: +//! ```toml +//! durable = { version = "0.1", features = ["telemetry"] } +//! ``` +//! +//! # Usage +//! +//! This module does **not** set up exporters. You must configure your own +//! tracing subscriber and metrics recorder in your application. The library +//! will emit metrics and propagate trace context automatically. + +mod metrics; +mod propagation; + +pub use metrics::*; +pub use propagation::{extract_trace_context, inject_trace_context}; diff --git a/src/telemetry/propagation.rs b/src/telemetry/propagation.rs new file mode 100644 index 0000000..da25b08 --- /dev/null +++ b/src/telemetry/propagation.rs @@ -0,0 +1,110 @@ +//! W3C Trace Context propagation for distributed tracing across process boundaries. +//! +//! This module enables trace context to flow from task spawners to task executors, +//! even when they run on different machines communicating via PostgreSQL. +//! +//! The trace context is serialized using the W3C Trace Context standard format: +//! `traceparent: 00-{trace_id}-{span_id}-{flags}` +//! +//! The trace context is stored under a single namespaced key `durable::otel_context` +//! as a serialized JSON object containing the W3C headers. + +use opentelemetry::Context; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use serde_json::Value as JsonValue; +use std::collections::HashMap; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// Key used to store OTEL context in the headers map +const OTEL_CONTEXT_KEY: &str = "durable::otel_context"; + +/// Inject the current span's trace context into a headers map. +/// +/// This should be called at task spawn time to capture the caller's trace context. +/// The trace context is stored as a JSON object under the `durable::otel_context` key. +/// +/// # Example +/// +/// ```ignore +/// let mut headers = HashMap::new(); +/// inject_trace_context(&mut headers); +/// // headers now contains {"durable::otel_context": {"traceparent": "00-...-...-01"}} +/// ``` +pub fn inject_trace_context(headers: &mut HashMap) { + let propagator = TraceContextPropagator::new(); + let cx = tracing::Span::current().context(); + + // Inject into a temporary HashMap + let mut otel_headers: HashMap = HashMap::new(); + propagator.inject_context(&cx, &mut otel_headers); + + // Only store if there's actual context to propagate + if !otel_headers.is_empty() + && let Ok(json_value) = serde_json::to_value(otel_headers) + { + headers.insert(OTEL_CONTEXT_KEY.to_string(), json_value); + } +} + +/// Extract trace context from a headers map. +/// +/// This should be called at task execution time to restore the caller's trace context. +/// The returned `Context` can be used to set the parent of a new span. +/// +/// # Example +/// +/// ```ignore +/// let cx = extract_trace_context(&task.headers); +/// let span = info_span!("task.execute"); +/// span.set_parent(cx); +/// ``` +pub fn extract_trace_context(headers: &HashMap) -> Context { + let propagator = TraceContextPropagator::new(); + + // Extract the OTEL context from the namespaced key + let otel_headers: HashMap = headers + .get(OTEL_CONTEXT_KEY) + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + propagator.extract(&otel_headers) +} + +/// Check if headers contain trace context. +#[allow(dead_code)] +pub fn has_trace_context(headers: &HashMap) -> bool { + headers.contains_key(OTEL_CONTEXT_KEY) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn test_inject_extract_roundtrip() { + // Note: This test verifies the basic mechanics work. + // Full integration testing requires an active OpenTelemetry context. + let mut headers = HashMap::new(); + + // Without an active span, inject should still work (just won't have context) + inject_trace_context(&mut headers); + + // Extract should return a valid (possibly empty) context + let _cx = extract_trace_context(&headers); + } + + #[test] + fn test_has_trace_context() { + let mut headers = HashMap::new(); + assert!(!has_trace_context(&headers)); + + // Insert a properly structured OTEL context + let mut otel_context = HashMap::new(); + otel_context.insert("traceparent".to_string(), "00-abc-def-01".to_string()); + let json_value = serde_json::to_value(otel_context).unwrap(); + headers.insert(OTEL_CONTEXT_KEY.to_string(), json_value); + assert!(has_trace_context(&headers)); + } +} diff --git a/src/worker.rs b/src/worker.rs index ca46e74..c813af4 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{RwLock, Semaphore, broadcast, mpsc}; use tokio::time::{Instant, sleep, sleep_until}; +use tracing::Instrument; use uuid::Uuid; use crate::context::TaskContext; @@ -118,6 +119,10 @@ impl Worker { let poll_interval = std::time::Duration::from_secs_f64(options.poll_interval); let fatal_on_lease_timeout = options.fatal_on_lease_timeout; + // Mark worker as active + #[cfg(feature = "telemetry")] + crate::telemetry::set_worker_active(&queue_name, &worker_id, true); + // Semaphore limits concurrent task execution let semaphore = Arc::new(Semaphore::new(concurrency)); @@ -129,6 +134,10 @@ impl Worker { // Shutdown signal received _ = shutdown_rx.recv() => { tracing::info!("Worker shutting down, waiting for in-flight tasks..."); + + #[cfg(feature = "telemetry")] + crate::telemetry::set_worker_active(&queue_name, &worker_id, false); + drop(done_tx); while done_rx.recv().await.is_some() {} tracing::info!("Worker shutdown complete"); @@ -195,6 +204,14 @@ impl Worker { } } + #[cfg_attr( + feature = "telemetry", + tracing::instrument( + name = "durable.worker.claim_tasks", + skip(pool), + fields(queue = %queue_name, worker_id = %worker_id, count = count) + ) + )] async fn claim_tasks( pool: &PgPool, queue_name: &str, @@ -202,6 +219,9 @@ impl Worker { claim_timeout: u64, count: usize, ) -> anyhow::Result> { + #[cfg(feature = "telemetry")] + let start = std::time::Instant::now(); + let query = "SELECT run_id, task_id, attempt, task_name, params, retry_strategy, max_attempts, headers, wake_event, event_payload FROM durable.claim_task($1, $2, $3, $4)"; @@ -214,10 +234,21 @@ impl Worker { .fetch_all(pool) .await?; - rows.into_iter() + let tasks: Vec = rows + .into_iter() .map(TryInto::try_into) - .collect::, _>>() - .map_err(Into::into) + .collect::, _>>()?; + + #[cfg(feature = "telemetry")] + { + let duration = start.elapsed().as_secs_f64(); + crate::telemetry::record_task_claim_duration(queue_name, duration); + for _ in &tasks { + crate::telemetry::record_task_claimed(queue_name); + } + } + + Ok(tasks) } async fn execute_task( @@ -230,10 +261,56 @@ impl Worker { state: State, ) where State: Clone + Send + Sync + 'static, + { + // Create span for task execution, linked to parent trace context if available + let span = tracing::info_span!( + "durable.worker.execute_task", + queue = %queue_name, + task_id = %task.task_id, + run_id = %task.run_id, + task_name = %task.task_name, + attempt = task.attempt, + ); + + // Extract and set parent trace context from headers (for distributed tracing) + #[cfg(feature = "telemetry")] + if let Some(ref headers) = task.headers { + use tracing_opentelemetry::OpenTelemetrySpanExt; + let parent_cx = crate::telemetry::extract_trace_context(headers); + span.set_parent(parent_cx); + } + + Self::execute_task_inner( + pool, + queue_name, + registry, + task, + claim_timeout, + fatal_on_lease_timeout, + state, + ) + .instrument(span) + .await + } + + async fn execute_task_inner( + pool: PgPool, + queue_name: String, + registry: Arc>>, + task: ClaimedTask, + claim_timeout: u64, + fatal_on_lease_timeout: bool, + state: State, + ) where + State: Clone + Send + Sync + 'static, { let task_label = format!("{} ({})", task.task_name, task.task_id); let task_id = task.task_id; let run_id = task.run_id; + #[cfg(feature = "telemetry")] + let task_name = task.task_name.clone(); + #[cfg(feature = "telemetry")] + let queue_name_for_metrics = queue_name.clone(); let start_time = Instant::now(); // Create lease extension channel - TaskContext will notify when lease is extended @@ -396,23 +473,65 @@ impl Worker { return; }; + // Record metrics for task execution + #[cfg(feature = "telemetry")] + let outcome: &str; + match result { Ok(output) => { + #[cfg(feature = "telemetry")] + { + outcome = "completed"; + } Self::complete_run(&pool, &queue_name, task.run_id, output).await; + + #[cfg(feature = "telemetry")] + crate::telemetry::record_task_completed(&queue_name_for_metrics, &task_name); } Err(TaskError::Control(ControlFlow::Suspend)) => { // Task suspended - do nothing, scheduler will resume it + #[cfg(feature = "telemetry")] + { + outcome = "suspended"; + } tracing::debug!("Task {} suspended", task_label); } Err(TaskError::Control(ControlFlow::Cancelled)) => { // Task cancelled - do nothing + #[cfg(feature = "telemetry")] + { + outcome = "cancelled"; + } tracing::info!("Task {} was cancelled", task_label); } - Err(TaskError::Failed(e)) => { + Err(TaskError::Failed(ref e)) => { + #[cfg(feature = "telemetry")] + { + outcome = "failed"; + } tracing::error!("Task {} failed: {}", task_label, e); - Self::fail_run(&pool, &queue_name, task.run_id, &e).await; + Self::fail_run(&pool, &queue_name, task.run_id, e).await; + + #[cfg(feature = "telemetry")] + crate::telemetry::record_task_failed( + &queue_name_for_metrics, + &task_name, + "task_error", + ); } } + + // Record execution duration + #[cfg(feature = "telemetry")] + { + let duration = start_time.elapsed().as_secs_f64(); + crate::telemetry::record_task_execution_duration( + &queue_name_for_metrics, + &task_name, + outcome, + duration, + ); + } } async fn complete_run(pool: &PgPool, queue_name: &str, run_id: Uuid, result: JsonValue) { diff --git a/tests/spawn_test.rs b/tests/spawn_test.rs index 5e0dc2c..1474326 100644 --- a/tests/spawn_test.rs +++ b/tests/spawn_test.rs @@ -487,3 +487,75 @@ async fn test_spawn_with_transaction_rollback(pool: PgPool) -> sqlx::Result<()> Ok(()) } + +// ============================================================================ +// Reserved Header Prefix Validation Tests +// ============================================================================ + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_spawn_rejects_reserved_header_prefix(pool: PgPool) -> sqlx::Result<()> { + let client = create_client(pool.clone(), "reserved_headers").await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + let mut headers = HashMap::new(); + headers.insert("durable::custom".to_string(), serde_json::json!("value")); + + let options = SpawnOptions { + headers: Some(headers), + ..Default::default() + }; + + let result = client + .spawn_with_options::( + EchoParams { + message: "test".to_string(), + }, + options, + ) + .await; + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("reserved prefix 'durable::'"), + "Error should mention reserved prefix, got: {}", + err + ); + + Ok(()) +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_spawn_allows_non_reserved_headers(pool: PgPool) -> sqlx::Result<()> { + let client = create_client(pool.clone(), "allowed_headers").await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + let mut headers = HashMap::new(); + // These should all be allowed - they don't start with "durable::" + headers.insert("my-header".to_string(), serde_json::json!("value")); + headers.insert("durable".to_string(), serde_json::json!("no colons")); + headers.insert("durable:single".to_string(), serde_json::json!("one colon")); + + let options = SpawnOptions { + headers: Some(headers), + ..Default::default() + }; + + let result = client + .spawn_with_options::( + EchoParams { + message: "test".to_string(), + }, + options, + ) + .await; + + assert!( + result.is_ok(), + "Headers without 'durable::' prefix should be allowed" + ); + + Ok(()) +} diff --git a/tests/telemetry_test.rs b/tests/telemetry_test.rs new file mode 100644 index 0000000..e1c5c72 --- /dev/null +++ b/tests/telemetry_test.rs @@ -0,0 +1,445 @@ +//! Integration tests for telemetry (spans and metrics). +//! +//! These tests verify that spans are created and metrics are recorded during +//! actual task execution. +//! +//! Run with: `cargo test --features telemetry telemetry_test` + +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +#![cfg(feature = "telemetry")] + +mod common; + +use common::helpers::wait_for_task_state; +use common::tasks::{EchoParams, EchoTask, FailingParams, FailingTask, MultiStepTask}; +use durable::{Durable, MIGRATOR, WorkerOptions}; +use metrics_util::CompositeKey; +use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshot}; +use ordered_float::OrderedFloat; +use sqlx::PgPool; +use std::sync::OnceLock; +use std::time::Duration; + +/// Helper to create a Durable client from the test pool. +async fn create_client(pool: PgPool, queue_name: &str) -> Durable { + Durable::builder() + .pool(pool) + .queue_name(queue_name) + .build() + .await + .expect("Failed to create Durable client") +} + +// ============================================================================ +// Metrics Helper Functions +// ============================================================================ + +fn find_counter_with_label( + snapshot: Snapshot, + name: &str, + label_key: &str, + label_value: &str, +) -> Option<(CompositeKey, u64)> { + snapshot + .into_vec() + .into_iter() + .find(|(key, _, _, _)| { + key.key().name() == name + && key + .key() + .labels() + .any(|l| l.key() == label_key && l.value() == label_value) + }) + .map(|(key, _, _, value)| { + let count = match value { + DebugValue::Counter(c) => c, + _ => panic!("Expected counter"), + }; + (key, count) + }) +} + +fn find_gauge_with_label( + snapshot: Snapshot, + name: &str, + label_key: &str, + label_value: &str, +) -> Option<(CompositeKey, f64)> { + snapshot + .into_vec() + .into_iter() + .find(|(key, _, _, _)| { + key.key().name() == name + && key + .key() + .labels() + .any(|l| l.key() == label_key && l.value() == label_value) + }) + .map(|(key, _, _, value)| { + let gauge_value = match value { + DebugValue::Gauge(g) => g.0, + _ => panic!("Expected gauge"), + }; + (key, gauge_value) + }) +} + +#[allow(dead_code)] +fn find_histogram( + snapshot: Snapshot, + name: &str, +) -> Option<(CompositeKey, Vec>)> { + snapshot + .into_vec() + .into_iter() + .find(|(key, _, _, _)| key.key().name() == name) + .map(|(key, _, _, value)| { + let values = match value { + DebugValue::Histogram(h) => h, + _ => panic!("Expected histogram"), + }; + (key, values) + }) +} + +fn get_label<'a>(key: &'a CompositeKey, label_name: &str) -> Option<&'a str> { + key.key() + .labels() + .find(|l| l.key() == label_name) + .map(|l| l.value()) +} + +fn count_metrics_by_name(snapshot: Snapshot, name: &str) -> usize { + snapshot + .into_vec() + .into_iter() + .filter(|(key, _, _, _)| key.key().name() == name) + .count() +} + +// Global snapshotter for tests - recorder installed once +static GLOBAL_SNAPSHOTTER: OnceLock = OnceLock::new(); + +fn get_snapshotter() -> metrics_util::debugging::Snapshotter { + GLOBAL_SNAPSHOTTER + .get_or_init(|| { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + recorder.install().expect("Failed to install recorder"); + snapshotter + }) + .clone() +} + +// ============================================================================ +// Metrics Integration Tests +// ============================================================================ + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_task_lifecycle_metrics(pool: PgPool) -> sqlx::Result<()> { + let snapshotter = get_snapshotter(); + let queue_name = "metrics_lifecycle"; + + let client = create_client(pool.clone(), queue_name).await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + // Spawn a task + let spawn_result = client + .spawn::(EchoParams { + message: "test".to_string(), + }) + .await + .expect("Failed to spawn task"); + + // Start worker + let worker = client + .start_worker(WorkerOptions { + poll_interval: 0.05, + claim_timeout: 30, + ..Default::default() + }) + .await; + + // Wait for task to complete + wait_for_task_state( + &pool, + queue_name, + spawn_result.task_id, + "completed", + Duration::from_secs(10), + ) + .await + .expect("Task should complete"); + + worker.shutdown().await; + + // Give a moment for metrics to flush + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify task spawned metric for this specific queue + let snapshot = snapshotter.snapshot(); + let spawn_result = + find_counter_with_label(snapshot, "durable_tasks_spawned_total", "queue", queue_name); + assert!( + spawn_result.is_some(), + "Task spawned metric should exist for queue {}", + queue_name + ); + if let Some((key, count)) = spawn_result { + assert!(count >= 1, "Task spawn count should be at least 1"); + assert_eq!(get_label(&key, "task_name"), Some("echo")); + } + + // Verify task claimed metric exists for this queue + let snapshot = snapshotter.snapshot(); + let claimed = + find_counter_with_label(snapshot, "durable_tasks_claimed_total", "queue", queue_name); + assert!( + claimed.is_some(), + "Task claimed metric should exist for queue {}", + queue_name + ); + + // Verify task completed metric exists for this queue + let snapshot = snapshotter.snapshot(); + let completed = find_counter_with_label( + snapshot, + "durable_tasks_completed_total", + "queue", + queue_name, + ); + assert!( + completed.is_some(), + "Task completed metric should exist for queue {}", + queue_name + ); + + Ok(()) +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_task_failure_metrics(pool: PgPool) -> sqlx::Result<()> { + let snapshotter = get_snapshotter(); + + // Get baseline + let baseline = snapshotter.snapshot(); + let baseline_failed_count = count_metrics_by_name(baseline, "durable_tasks_failed_total"); + + let client = create_client(pool.clone(), "metrics_failure").await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + // Spawn a task that will fail + let spawn_result = client + .spawn::(FailingParams { + error_message: "intentional failure".to_string(), + }) + .await + .expect("Failed to spawn task"); + + let worker = client + .start_worker(WorkerOptions { + poll_interval: 0.05, + claim_timeout: 30, + ..Default::default() + }) + .await; + + // Wait for task to fail + wait_for_task_state( + &pool, + "metrics_failure", + spawn_result.task_id, + "failed", + Duration::from_secs(10), + ) + .await + .expect("Task should fail"); + + worker.shutdown().await; + + tokio::time::sleep(Duration::from_millis(100)).await; + + let snapshot = snapshotter.snapshot(); + + // Verify task failed metric increased + let failed_count = count_metrics_by_name(snapshot, "durable_tasks_failed_total"); + assert!( + failed_count > baseline_failed_count, + "Task failed count should have increased" + ); + + Ok(()) +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_worker_gauge_metrics(pool: PgPool) -> sqlx::Result<()> { + let snapshotter = get_snapshotter(); + let queue_name = "metrics_worker"; + + let client = create_client(pool.clone(), queue_name).await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + let worker = client + .start_worker(WorkerOptions { + poll_interval: 0.05, + claim_timeout: 30, + ..Default::default() + }) + .await; + + // Give the worker time to set its active gauge + tokio::time::sleep(Duration::from_millis(200)).await; + + // Check worker active gauge while running for this specific queue + let snapshot = snapshotter.snapshot(); + let worker_active = + find_gauge_with_label(snapshot, "durable_worker_active", "queue", queue_name); + assert!( + worker_active.is_some(), + "Worker active gauge should be recorded for queue {}", + queue_name + ); + + if let Some((_, value)) = worker_active { + assert!( + (value - 1.0).abs() < f64::EPSILON, + "Worker should be active (value={})", + value + ); + } + + worker.shutdown().await; + + // After shutdown, worker should set gauge to 0 + tokio::time::sleep(Duration::from_millis(100)).await; + + let snapshot = snapshotter.snapshot(); + if let Some((_, value)) = + find_gauge_with_label(snapshot, "durable_worker_active", "queue", queue_name) + { + // The gauge should be 0 after shutdown + assert!( + value.abs() < f64::EPSILON, + "Worker gauge should be 0 after shutdown (value={})", + value + ); + } + + Ok(()) +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_checkpoint_metrics(pool: PgPool) -> sqlx::Result<()> { + let snapshotter = get_snapshotter(); + + // Get baseline + let baseline = snapshotter.snapshot(); + let baseline_ckpt_count = + count_metrics_by_name(baseline, "durable_checkpoint_duration_seconds"); + + let client = create_client(pool.clone(), "metrics_checkpoint").await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + // MultiStepTask has steps which record checkpoint metrics + let spawn_result = client + .spawn::(()) + .await + .expect("Failed to spawn task"); + + let worker = client + .start_worker(WorkerOptions { + poll_interval: 0.05, + claim_timeout: 30, + ..Default::default() + }) + .await; + + wait_for_task_state( + &pool, + "metrics_checkpoint", + spawn_result.task_id, + "completed", + Duration::from_secs(10), + ) + .await + .expect("Task should complete"); + + worker.shutdown().await; + + tokio::time::sleep(Duration::from_millis(100)).await; + + let snapshot = snapshotter.snapshot(); + + // Verify checkpoint duration histogram was recorded + let checkpoint_count = count_metrics_by_name(snapshot, "durable_checkpoint_duration_seconds"); + assert!( + checkpoint_count > baseline_ckpt_count, + "Expected checkpoint duration metrics to increase (baseline: {}, current: {})", + baseline_ckpt_count, + checkpoint_count + ); + + Ok(()) +} + +// ============================================================================ +// Execution Duration Histogram Tests +// ============================================================================ + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_task_execution_duration_metrics(pool: PgPool) -> sqlx::Result<()> { + let snapshotter = get_snapshotter(); + + // Get baseline + let baseline = snapshotter.snapshot(); + let baseline_duration_count = + count_metrics_by_name(baseline, "durable_task_execution_duration_seconds"); + + let client = create_client(pool.clone(), "metrics_exec_dur").await; + client.create_queue(None).await.unwrap(); + client.register::().await; + + let spawn_result = client + .spawn::(EchoParams { + message: "test".to_string(), + }) + .await + .expect("Failed to spawn task"); + + let worker = client + .start_worker(WorkerOptions { + poll_interval: 0.05, + claim_timeout: 30, + ..Default::default() + }) + .await; + + wait_for_task_state( + &pool, + "metrics_exec_dur", + spawn_result.task_id, + "completed", + Duration::from_secs(10), + ) + .await + .expect("Task should complete"); + + worker.shutdown().await; + + tokio::time::sleep(Duration::from_millis(100)).await; + + let snapshot = snapshotter.snapshot(); + + // Verify execution duration histogram was recorded + let duration_count = count_metrics_by_name(snapshot, "durable_task_execution_duration_seconds"); + assert!( + duration_count > baseline_duration_count, + "Expected execution duration metrics to increase" + ); + + Ok(()) +}