Skip to content

Commit bfece75

Browse files
authored
Set up tracing, OTel, and metrics (#16)
* initial rust initialization * added pre-commit * set up sqlx and rust * added license files and initial migration; * added initial impl of client * tests compile * fixed bugs with sqlx types * added documentation * added tests * updated tasks * added a test that mocks the example in README * removed todos * added convenience methods for uuid, rand, now * added handling for spawning and joining subtasks from workflows * added missing sql and test files * merged migrations * cleaned up bad code * cleaned up json handling * made process exit optional on too-long tasks * fixed semaphore ordering * fixed issues with clock skew * improved handling of leases * added comments on sql schema * enforced that claim timeouts must be set * cleaned up and documented sql * added support for transactions that enqueue tasks * added a bunch of tests * documented and tested event semantics * added benchmarks * initial implementation of telemetry * removed exporter setup from crate * telemetry tests pass * removed extra license file * inject otel context as a string for key durable::otel_context * protect durable:: headers for internal use * addressed PR comment
1 parent 4eae5b8 commit bfece75

File tree

11 files changed

+1611
-29
lines changed

11 files changed

+1611
-29
lines changed

Cargo.lock

Lines changed: 318 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,27 @@ hostname = "0.4"
2525
rand = "0.9"
2626
futures = "0.3.31"
2727

28+
# Optional telemetry dependencies
29+
tracing-opentelemetry = { version = "0.28", optional = true }
30+
opentelemetry = { version = "0.27", optional = true }
31+
opentelemetry_sdk = { version = "0.27", optional = true }
32+
metrics = { version = "0.24", optional = true }
33+
34+
[features]
35+
default = []
36+
telemetry = [
37+
"dep:tracing-opentelemetry",
38+
"dep:opentelemetry",
39+
"dep:opentelemetry_sdk",
40+
"dep:metrics",
41+
]
42+
2843
[dev-dependencies]
2944
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
45+
tracing-fluent-assertions = "0.3"
46+
metrics-util = { version = "0.18", features = ["debugging"] }
47+
tracing-subscriber = { version = "0.3", features = ["registry"] }
48+
ordered-float = "4"
3049

3150
[[bench]]
3251
name = "throughput"

src/client.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,21 @@ impl CancellationPolicyDb {
4848

4949
use crate::worker::Worker;
5050

51+
/// Validates that user-provided headers don't use reserved prefixes.
52+
fn validate_headers(headers: &Option<HashMap<String, JsonValue>>) -> anyhow::Result<()> {
53+
if let Some(headers) = headers {
54+
for key in headers.keys() {
55+
if key.starts_with("durable::") {
56+
anyhow::bail!(
57+
"Header key '{}' uses reserved prefix 'durable::'. User headers cannot start with 'durable::'.",
58+
key
59+
);
60+
}
61+
}
62+
}
63+
Ok(())
64+
}
65+
5166
/// The main client for interacting with durable workflows.
5267
///
5368
/// Use this client to:
@@ -355,6 +370,14 @@ where
355370
/// Spawn a task by name using a custom executor.
356371
///
357372
/// The task must be registered before spawning.
373+
#[cfg_attr(
374+
feature = "telemetry",
375+
tracing::instrument(
376+
name = "durable.client.spawn",
377+
skip(self, executor, params, options),
378+
fields(queue, task_name = %task_name)
379+
)
380+
)]
358381
pub async fn spawn_by_name_with<'e, E>(
359382
&self,
360383
executor: E,
@@ -380,16 +403,30 @@ where
380403
}
381404

382405
/// Internal spawn implementation without registry validation.
406+
#[allow(unused_mut)] // mut is needed when telemetry feature is enabled
383407
async fn spawn_by_name_internal<'e, E>(
384408
&self,
385409
executor: E,
386410
task_name: &str,
387411
params: JsonValue,
388-
options: SpawnOptions,
412+
mut options: SpawnOptions,
389413
) -> anyhow::Result<SpawnResult>
390414
where
391415
E: Executor<'e, Database = Postgres>,
392416
{
417+
// Validate user headers don't use reserved prefix
418+
validate_headers(&options.headers)?;
419+
420+
// Inject trace context into headers for distributed tracing
421+
#[cfg(feature = "telemetry")]
422+
{
423+
let headers = options.headers.get_or_insert_with(HashMap::new);
424+
crate::telemetry::inject_trace_context(headers);
425+
}
426+
427+
#[cfg(feature = "telemetry")]
428+
tracing::Span::current().record("queue", &self.queue_name);
429+
393430
let max_attempts = options.max_attempts.unwrap_or(self.default_max_attempts);
394431

395432
let db_options = Self::serialize_spawn_options(&options, max_attempts)?;
@@ -405,6 +442,9 @@ where
405442
.fetch_one(executor)
406443
.await?;
407444

445+
#[cfg(feature = "telemetry")]
446+
crate::telemetry::record_task_spawned(&self.queue_name, task_name);
447+
408448
Ok(SpawnResult {
409449
task_id: row.task_id,
410450
run_id: row.run_id,
@@ -452,6 +492,14 @@ where
452492
}
453493

454494
/// Emit an event to a queue (defaults to this client's queue)
495+
#[cfg_attr(
496+
feature = "telemetry",
497+
tracing::instrument(
498+
name = "durable.client.emit_event",
499+
skip(self, payload),
500+
fields(queue, event_name = %event_name)
501+
)
502+
)]
455503
pub async fn emit_event<T: Serialize>(
456504
&self,
457505
event_name: &str,
@@ -461,6 +509,10 @@ where
461509
anyhow::ensure!(!event_name.is_empty(), "event_name must be non-empty");
462510

463511
let queue = queue_name.unwrap_or(&self.queue_name);
512+
513+
#[cfg(feature = "telemetry")]
514+
tracing::Span::current().record("queue", queue);
515+
464516
let payload_json = serde_json::to_value(payload)?;
465517

466518
let query = "SELECT durable.emit_event($1, $2, $3)";
@@ -471,6 +523,9 @@ where
471523
.execute(&self.pool)
472524
.await?;
473525

526+
#[cfg(feature = "telemetry")]
527+
crate::telemetry::record_event_emitted(queue, event_name);
528+
474529
Ok(())
475530
}
476531

src/context.rs

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ where
156156
/// stripe::charge(amount, &idempotency_key).await
157157
/// }).await?;
158158
/// ```
159+
#[cfg_attr(
160+
feature = "telemetry",
161+
tracing::instrument(
162+
name = "durable.task.step",
163+
skip(self, f),
164+
fields(task_id = %self.task_id, step_name = %name)
165+
)
166+
)]
159167
pub async fn step<T, F, Fut>(&mut self, name: &str, f: F) -> TaskResult<T>
160168
where
161169
T: Serialize + DeserializeOwned + Send,
@@ -174,8 +182,21 @@ where
174182
let result = f().await?;
175183

176184
// Persist checkpoint (also extends claim lease)
185+
#[cfg(feature = "telemetry")]
186+
let checkpoint_start = std::time::Instant::now();
187+
177188
self.persist_checkpoint(&checkpoint_name, &result).await?;
178189

190+
#[cfg(feature = "telemetry")]
191+
{
192+
let duration = checkpoint_start.elapsed().as_secs_f64();
193+
crate::telemetry::record_checkpoint_duration(
194+
&self.queue_name,
195+
&self.task.task_name,
196+
duration,
197+
);
198+
}
199+
179200
Ok(result)
180201
}
181202

@@ -225,6 +246,14 @@ where
225246
///
226247
/// Wake time is computed using the database clock to ensure consistency
227248
/// with the scheduler and enable deterministic testing via `durable.fake_now`.
249+
#[cfg_attr(
250+
feature = "telemetry",
251+
tracing::instrument(
252+
name = "durable.task.sleep_for",
253+
skip(self),
254+
fields(task_id = %self.task_id, duration_ms = duration.as_millis() as u64)
255+
)
256+
)]
228257
pub async fn sleep_for(&mut self, name: &str, duration: std::time::Duration) -> TaskResult<()> {
229258
validate_user_name(name)?;
230259
let checkpoint_name = self.get_checkpoint_name(name);
@@ -268,6 +297,14 @@ where
268297
/// Some(Duration::from_secs(7 * 24 * 3600)),
269298
/// ).await?;
270299
/// ```
300+
#[cfg_attr(
301+
feature = "telemetry",
302+
tracing::instrument(
303+
name = "durable.task.await_event",
304+
skip(self, timeout),
305+
fields(task_id = %self.task_id, event_name = %event_name)
306+
)
307+
)]
271308
pub async fn await_event<T: DeserializeOwned>(
272309
&mut self,
273310
event_name: &str,
@@ -323,6 +360,14 @@ where
323360
/// updates the payload (last write wins). Tasks waiting for this event
324361
/// are woken with the payload at the time of the write that woke them;
325362
/// subsequent writes do not propagate to already-woken tasks.
363+
#[cfg_attr(
364+
feature = "telemetry",
365+
tracing::instrument(
366+
name = "durable.task.emit_event",
367+
skip(self, payload),
368+
fields(task_id = %self.task_id, event_name = %event_name)
369+
)
370+
)]
326371
pub async fn emit_event<T: Serialize>(&self, event_name: &str, payload: &T) -> TaskResult<()> {
327372
if event_name.is_empty() {
328373
return Err(TaskError::Failed(anyhow::anyhow!(
@@ -352,6 +397,14 @@ where
352397
///
353398
/// # Errors
354399
/// Returns `TaskError::Control(Cancelled)` if the task was cancelled.
400+
#[cfg_attr(
401+
feature = "telemetry",
402+
tracing::instrument(
403+
name = "durable.task.heartbeat",
404+
skip(self),
405+
fields(task_id = %self.task_id)
406+
)
407+
)]
355408
pub async fn heartbeat(&self, duration: Option<std::time::Duration>) -> TaskResult<()> {
356409
let extend_by = duration
357410
.map(|d| d.as_secs() as i32)
@@ -454,6 +507,14 @@ where
454507
/// let r1: ItemResult = ctx.join("item-1", h1).await?;
455508
/// let r2: ItemResult = ctx.join("item-2", h2).await?;
456509
/// ```
510+
#[cfg_attr(
511+
feature = "telemetry",
512+
tracing::instrument(
513+
name = "durable.task.spawn",
514+
skip(self, params, options),
515+
fields(task_id = %self.task_id, subtask_name = T::NAME)
516+
)
517+
)]
457518
pub async fn spawn<T>(
458519
&mut self,
459520
name: &str,
@@ -515,6 +576,19 @@ where
515576
options: SpawnOptions,
516577
) -> TaskResult<TaskHandle<T>> {
517578
validate_user_name(name)?;
579+
580+
// Validate headers don't use reserved prefix
581+
if let Some(ref headers) = options.headers {
582+
for key in headers.keys() {
583+
if key.starts_with("durable::") {
584+
return Err(TaskError::Failed(anyhow::anyhow!(
585+
"Header key '{}' uses reserved prefix 'durable::'. User headers cannot start with 'durable::'.",
586+
key
587+
)));
588+
}
589+
}
590+
}
591+
518592
let checkpoint_name = self.get_checkpoint_name(&format!("$spawn:{name}"));
519593

520594
// Return cached task_id if already spawned
@@ -574,8 +648,7 @@ where
574648
///
575649
/// # Arguments
576650
///
577-
/// * `name` - Unique name for this join operation (used for checkpointing).
578-
/// Uniqueness is constrained just within this task, not globally or for child tasks.
651+
/// * `name` - Unique name for this join operation (used for checkpointing)
579652
/// * `handle` - The [`TaskHandle`] returned by [`spawn`](Self::spawn)
580653
///
581654
/// # Errors
@@ -590,6 +663,14 @@ where
590663
/// // ... do other work ...
591664
/// let result: ComputeResult = ctx.join("compute", handle).await?;
592665
/// ```
666+
#[cfg_attr(
667+
feature = "telemetry",
668+
tracing::instrument(
669+
name = "durable.task.join",
670+
skip(self, handle),
671+
fields(task_id = %self.task_id, child_task_id = %handle.task_id)
672+
)
673+
)]
593674
pub async fn join<T: DeserializeOwned>(
594675
&mut self,
595676
name: &str,

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ mod client;
9696
mod context;
9797
mod error;
9898
mod task;
99+
#[cfg(feature = "telemetry")]
100+
pub mod telemetry;
99101
mod types;
100102
mod worker;
101103

0 commit comments

Comments
 (0)