From e81adf07a0e245a47a9cb622c3b1925b80cbdd95 Mon Sep 17 00:00:00 2001 From: Guillaume Deconinck Date: Mon, 11 Aug 2025 11:09:34 +0900 Subject: [PATCH 1/2] feat(logs): add correlationId and trace id --- Cargo.lock | 2 + bins/nittei/Cargo.toml | 3 + .../src/telemetry/datadog_json_format.rs | 154 ++++++++++++++++++ .../src/{telemetry.rs => telemetry/mod.rs} | 10 +- crates/api/Cargo.toml | 8 +- crates/api/src/lib.rs | 10 +- crates/api/src/telemetry/correlation_layer.rs | 75 +++++++++ crates/api/src/{ => telemetry}/http_logger.rs | 9 + crates/api/src/telemetry/mod.rs | 2 + crates/domain/src/event_instance.rs | 8 +- 10 files changed, 271 insertions(+), 10 deletions(-) create mode 100644 bins/nittei/src/telemetry/datadog_json_format.rs rename bins/nittei/src/{telemetry.rs => telemetry/mod.rs} (95%) create mode 100644 crates/api/src/telemetry/correlation_layer.rs rename crates/api/src/{ => telemetry}/http_logger.rs (96%) create mode 100644 crates/api/src/telemetry/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 9e631876..4f9014ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1679,6 +1679,7 @@ dependencies = [ "utoipa", "utoipa-axum", "utoipa-swagger-ui", + "uuid", "validator", ] @@ -3720,6 +3721,7 @@ dependencies = [ "sharded-slab", "smallvec", "thread_local", + "time", "tracing", "tracing-core", "tracing-log", diff --git a/bins/nittei/Cargo.toml b/bins/nittei/Cargo.toml index 073c4194..459a6646 100644 --- a/bins/nittei/Cargo.toml +++ b/bins/nittei/Cargo.toml @@ -39,6 +39,7 @@ tracing-subscriber = { version = "0.3", features = [ "fmt", "json", "registry", + "time", ] } opentelemetry = { version = "0.30.0", default-features = false, features = [ "trace", @@ -61,6 +62,8 @@ reqwest = { version = "0.12", default-features = false, features = [ chrono = "0.4.39" chrono-tz = "0.10.1" +serde_json = "1" + # Use the `jemallocator` crate to use jemalloc as the global allocator. tikv-jemallocator = "0.6" diff --git a/bins/nittei/src/telemetry/datadog_json_format.rs b/bins/nittei/src/telemetry/datadog_json_format.rs new file mode 100644 index 00000000..4fc701e1 --- /dev/null +++ b/bins/nittei/src/telemetry/datadog_json_format.rs @@ -0,0 +1,154 @@ +use nittei_api::telemetry::correlation_layer::CorrelationId; +use opentelemetry::trace::TraceContextExt; +use tracing::{Event, Subscriber}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::{ + fmt::format::{FormatEvent, FormatFields}, + registry::LookupSpan, +}; + +pub struct DatadogJsonFmt; + +impl FormatEvent for DatadogJsonFmt +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'writer> FormatFields<'writer> + 'static, +{ + fn format_event( + &self, + ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>, + mut writer: tracing_subscriber::fmt::format::Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + use std::borrow::Cow; + + // Collect the event fields into a JSON map + let mut fields_map = serde_json::Map::new(); + + // Custom visitor to collect fields + struct FieldVisitor<'a>(&'a mut serde_json::Map); + + impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + serde_json::Value::String(format!("{:?}", value)), + ); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0.insert( + field.name().to_string(), + serde_json::Value::String(value.to_string()), + ); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.0.insert( + field.name().to_string(), + serde_json::Value::Number(serde_json::Number::from(value)), + ); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.0.insert( + field.name().to_string(), + serde_json::Value::Number(serde_json::Number::from(value)), + ); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.0 + .insert(field.name().to_string(), serde_json::Value::Bool(value)); + } + } + + let mut visitor = FieldVisitor(&mut fields_map); + event.record(&mut visitor); + let fields_value = serde_json::Value::Object(fields_map); + + // Get current span's otel context -> Datadog ids + let (dd_trace_id, dd_span_id) = { + let span = tracing::Span::current(); + let ctx_otel = span.context(); + let otel_span = ctx_otel.span(); + let sc = otel_span.span_context(); + if sc.is_valid() { + // Datadog expects 64-bit decimal ids: + let trace_id_bytes = sc.trace_id().to_bytes(); + let tid64 = u64::from_be_bytes([ + trace_id_bytes[8], + trace_id_bytes[9], + trace_id_bytes[10], + trace_id_bytes[11], + trace_id_bytes[12], + trace_id_bytes[13], + trace_id_bytes[14], + trace_id_bytes[15], + ]); + let sid64 = u64::from_be_bytes(sc.span_id().to_bytes()); + (Some(tid64), Some(sid64)) + } else { + (None, None) + } + }; + + // Include correlation_id field from the current span if present + let correlation_id = { + if let Some(scope) = ctx.lookup_current() { + scope + .extensions() + .get::() + .map(|c| Cow::from(c.0.clone())) + } else { + None + } + }; + + // Build final JSON + let mut obj = serde_json::Map::new(); + + // timestamp + obj.insert( + "@timestamp".into(), + serde_json::Value::String(chrono::Utc::now().to_rfc3339()), + ); + + // level & target + let meta = event.metadata(); + obj.insert("level".into(), meta.level().to_string().into()); + obj.insert("target".into(), meta.target().into()); + + // event fields + if let serde_json::Value::Object(map) = fields_value { + for (k, v) in map { + obj.insert(k, v); + } + } + + // correlation id (if any) + if let Some(cid) = correlation_id { + obj.insert( + "correlation_id".into(), + serde_json::Value::String(cid.into_owned()), + ); + } + + // datadog correlation fields + if let (Some(t), Some(s)) = (dd_trace_id, dd_span_id) { + obj.insert( + "dd.trace_id".into(), + serde_json::Value::String(t.to_string()), + ); + obj.insert( + "dd.span_id".into(), + serde_json::Value::String(s.to_string()), + ); + } + + // write out one line + let json_str = + serde_json::to_string(&serde_json::Value::Object(obj)).map_err(|_| std::fmt::Error)?; + writeln!(writer, "{}", json_str) + } +} diff --git a/bins/nittei/src/telemetry.rs b/bins/nittei/src/telemetry/mod.rs similarity index 95% rename from bins/nittei/src/telemetry.rs rename to bins/nittei/src/telemetry/mod.rs index 8b64a1ff..4ab29a59 100644 --- a/bins/nittei/src/telemetry.rs +++ b/bins/nittei/src/telemetry/mod.rs @@ -9,6 +9,10 @@ use opentelemetry_sdk::{ use tracing::warn; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt}; +pub mod datadog_json_format; + +use datadog_json_format::DatadogJsonFmt; + /// Register a subscriber as global default to process span data. /// /// It should only be called once! @@ -60,7 +64,11 @@ pub fn init_subscriber() -> anyhow::Result<()> { .with( tracing_subscriber::fmt::layer() .json() - .with_current_span(false), + .with_target(false) + .with_file(false) + .with_line_number(false) + .with_current_span(false) + .event_format(DatadogJsonFmt), ) .with(telemetry_layer); diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index f57ba51a..68b172f1 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -27,7 +27,7 @@ tower = "0.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -futures = "0.3" + validator = { version = "0.20", features = ["derive"] } reqwest = { version = "0.12", default-features = false, features = [ "http2", @@ -38,14 +38,18 @@ utoipa = { version = "5.3", features = ["axum_extras", "uuid", "chrono"] } utoipa-axum = { version = "0.2" } utoipa-swagger-ui = { version = "9.0", features = ["axum", "vendored"] } -async-trait = "0.1" rrule = "0.14" chrono = { version = "0.4.39", features = ["serde"] } chrono-tz = "0.10.1" +uuid = "1.17" + anyhow = "1.0" jsonwebtoken = "9" thiserror = "2.0" + +async-trait = "0.1" tokio = { version = "1.0", features = ["full"] } +futures = "0.3" opentelemetry = "0.30.0" opentelemetry-http = "0.30.0" diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 1bd02395..f6d5a62a 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -2,19 +2,18 @@ mod account; mod calendar; mod error; mod event; -mod http_logger; mod job_schedulers; mod schedule; mod service; mod shared; mod status; +pub mod telemetry; mod user; use std::sync::Arc; use axum::{Extension, Router, http::header}; use futures::lock::Mutex; -use http_logger::metadata_middleware; use job_schedulers::{start_reminder_generation_job, start_send_reminders_job}; use nittei_domain::{ Account, @@ -25,6 +24,7 @@ use nittei_domain::{ PEMKey, }; use nittei_infra::NitteiContext; +use telemetry::http_logger::metadata_middleware; use tokio::{ net::TcpListener, sync::oneshot::{self, Sender}, @@ -48,8 +48,11 @@ use utoipa_axum::router::OpenApiRouter; use utoipa_swagger_ui::SwaggerUi; use crate::{ - http_logger::{NitteiTracingOnFailure, NitteiTracingOnResponse, NitteiTracingSpanBuilder}, shared::auth::NITTEI_X_API_KEY_HEADER, + telemetry::{ + correlation_layer::CorrelationIdLayer, + http_logger::{NitteiTracingOnFailure, NitteiTracingOnResponse, NitteiTracingSpanBuilder}, + }, }; /// Configure the Actix server API @@ -158,6 +161,7 @@ impl Application { .on_failure(NitteiTracingOnFailure {}), ), ) + .layer(CorrelationIdLayer) .layer(Extension(context.clone())) .layer(Extension(shared_state.clone())) .split_for_parts(); diff --git a/crates/api/src/telemetry/correlation_layer.rs b/crates/api/src/telemetry/correlation_layer.rs new file mode 100644 index 00000000..09a2537f --- /dev/null +++ b/crates/api/src/telemetry/correlation_layer.rs @@ -0,0 +1,75 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use axum::{ + body::Body, + http::{HeaderName, HeaderValue, Request}, + response::Response, +}; +use tower::{Layer, Service}; +use uuid::Uuid; + +static CORRELATION_ID_HEADER: HeaderName = HeaderName::from_static("x-correlation-id"); + +#[derive(Clone, Default)] +pub struct CorrelationIdLayer; + +impl Layer for CorrelationIdLayer { + type Service = CorrelationIdService; + fn layer(&self, inner: S) -> Self::Service { + CorrelationIdService { inner } + } +} + +#[derive(Clone)] +pub struct CorrelationIdService { + inner: S, +} + +impl Service> for CorrelationIdService +where + S: Service, Response = Response> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into>, +{ + type Response = Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + // 1) get or generate + let cid = req + .headers() + .get(&CORRELATION_ID_HEADER) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()) + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + // 2) put in extensions so handlers & TraceLayer can read it + req.extensions_mut().insert(CorrelationId(cid.clone())); + + // proceed + let fut = self.inner.clone().call(req); + + Box::pin(async move { + let mut res = fut.await?; + + // 3) echo back on response + res.headers_mut().insert( + CORRELATION_ID_HEADER.clone(), + HeaderValue::from_str(&cid).unwrap_or(HeaderValue::from_static("invalid")), + ); + + Ok(res) + }) + } +} + +#[derive(Clone, Debug)] +pub struct CorrelationId(pub String); diff --git a/crates/api/src/http_logger.rs b/crates/api/src/telemetry/http_logger.rs similarity index 96% rename from crates/api/src/http_logger.rs rename to crates/api/src/telemetry/http_logger.rs index eb69175b..d3c63ed8 100644 --- a/crates/api/src/http_logger.rs +++ b/crates/api/src/telemetry/http_logger.rs @@ -12,6 +12,8 @@ use tower_http::trace::{MakeSpan, OnFailure, OnResponse}; use tracing::{Span, field::Empty}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use crate::telemetry::correlation_layer::CorrelationId; + const PATHS_TO_EXCLUDE_FROM_LOGGING_AND_TRACING: [&str; 2] = ["/api/v1/healthcheck", "/api/v1/metrics"]; @@ -61,6 +63,12 @@ impl MakeSpan for NitteiTracingSpanBuilder { .map(|r| r.as_str().to_string()) .unwrap_or_default(); + let correlation_id = request + .extensions() + .get::() + .map(|r| r.0.clone()) + .unwrap_or_default(); + // By default, exclude health check and metrics from tracing if PATHS_TO_EXCLUDE_FROM_LOGGING_AND_TRACING.contains(&path) && !APP_CONFIG @@ -94,6 +102,7 @@ impl MakeSpan for NitteiTracingSpanBuilder { exception.message = Empty, // to set on response "span.type" = "web", level = Empty, // will be set in on_response based on status code + correlation_id = %correlation_id, ); // Set the parent span for the OpenTelemetry span diff --git a/crates/api/src/telemetry/mod.rs b/crates/api/src/telemetry/mod.rs new file mode 100644 index 00000000..64e53bbe --- /dev/null +++ b/crates/api/src/telemetry/mod.rs @@ -0,0 +1,2 @@ +pub mod correlation_layer; +pub mod http_logger; diff --git a/crates/domain/src/event_instance.rs b/crates/domain/src/event_instance.rs index d0c614db..acb97c61 100644 --- a/crates/domain/src/event_instance.rs +++ b/crates/domain/src/event_instance.rs @@ -80,11 +80,11 @@ impl CompatibleInstances { pub fn push_back(&mut self, instance: EventInstance) -> bool { if !self.events.is_empty() { - if let Some(last_instance) = self.events.back() { + if let Some(last_instance) = self.events.back() + && last_instance.end_time > instance.start_time + { // There is overlap, so cannot be added - if last_instance.end_time > instance.start_time { - return false; - } + return false; } } self.events.push_back(instance); From c956c88b3e375048d25d831803e46884e5480999 Mon Sep 17 00:00:00 2001 From: Guillaume Deconinck Date: Mon, 11 Aug 2025 11:27:32 +0900 Subject: [PATCH 2/2] refactor: rename --- .../{datadog_json_format.rs => logs_json_format.rs} | 10 +++++++--- bins/nittei/src/telemetry/mod.rs | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) rename bins/nittei/src/telemetry/{datadog_json_format.rs => logs_json_format.rs} (93%) diff --git a/bins/nittei/src/telemetry/datadog_json_format.rs b/bins/nittei/src/telemetry/logs_json_format.rs similarity index 93% rename from bins/nittei/src/telemetry/datadog_json_format.rs rename to bins/nittei/src/telemetry/logs_json_format.rs index 4fc701e1..d7f23e33 100644 --- a/bins/nittei/src/telemetry/datadog_json_format.rs +++ b/bins/nittei/src/telemetry/logs_json_format.rs @@ -7,13 +7,15 @@ use tracing_subscriber::{ registry::LookupSpan, }; -pub struct DatadogJsonFmt; +/// Formatter for logs as JSON with correlation id & Datadog trace & span ids +pub struct LogsJsonFmt; -impl FormatEvent for DatadogJsonFmt +impl FormatEvent for LogsJsonFmt where S: Subscriber + for<'a> LookupSpan<'a>, N: for<'writer> FormatFields<'writer> + 'static, { + /// Format the event as JSON with correlation id & Datadog trace & span ids fn format_event( &self, ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>, @@ -28,6 +30,7 @@ where // Custom visitor to collect fields struct FieldVisitor<'a>(&'a mut serde_json::Map); + // Collect the event fields into a JSON map impl<'a> tracing::field::Visit for FieldVisitor<'a> { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { self.0.insert( @@ -63,6 +66,7 @@ where } } + // Collect the event fields into a JSON map let mut visitor = FieldVisitor(&mut fields_map); event.record(&mut visitor); let fields_value = serde_json::Value::Object(fields_map); @@ -134,7 +138,7 @@ where ); } - // datadog correlation fields + // datadog trace & span ids if let (Some(t), Some(s)) = (dd_trace_id, dd_span_id) { obj.insert( "dd.trace_id".into(), diff --git a/bins/nittei/src/telemetry/mod.rs b/bins/nittei/src/telemetry/mod.rs index 4ab29a59..13a9529d 100644 --- a/bins/nittei/src/telemetry/mod.rs +++ b/bins/nittei/src/telemetry/mod.rs @@ -9,9 +9,9 @@ use opentelemetry_sdk::{ use tracing::warn; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt}; -pub mod datadog_json_format; +pub mod logs_json_format; -use datadog_json_format::DatadogJsonFmt; +use logs_json_format::LogsJsonFmt; /// Register a subscriber as global default to process span data. /// @@ -68,7 +68,7 @@ pub fn init_subscriber() -> anyhow::Result<()> { .with_file(false) .with_line_number(false) .with_current_span(false) - .event_format(DatadogJsonFmt), + .event_format(LogsJsonFmt), ) .with(telemetry_layer);