From 70b45024a1e70f0cf00d7f76f7bf3057af5cbacd Mon Sep 17 00:00:00 2001 From: vahid Date: Fri, 3 Apr 2026 01:28:12 +0300 Subject: [PATCH 1/4] feat: HTTP/Webhook sink Delivers CDC events via HTTP POST/PUT to any URL. Features: - URL templates for per-table routing (${source.table}) - Custom headers with ${ENV_VAR} expansion (Bearer, Basic, API key) - Batch mode: JSON array in one request, or per-event requests - Retry on 5xx, 408, 429, connection errors, timeouts - Auth errors (401/403) fail immediately - DLQ-eligible routing errors (URL template failures) - Connection pooling via reqwest Integration tests (8): single event, batch per-event, batch mode array, auth error (no retry), 5xx retries, URL template routing, custom headers, connection refused retries. All use local axum test server. --- crates/deltaforge-config/src/sinks_cfg.rs | 62 +++ crates/sinks/Cargo.toml | 5 +- crates/sinks/src/http.rs | 511 ++++++++++++++++++++++ crates/sinks/src/lib.rs | 10 + crates/sinks/tests/http_sink_tests.rs | 346 +++++++++++++++ docs/src/SUMMARY.md | 1 + docs/src/sinks/README.md | 2 + docs/src/sinks/http.md | 178 ++++++++ 8 files changed, 1114 insertions(+), 1 deletion(-) create mode 100644 crates/sinks/src/http.rs create mode 100644 crates/sinks/tests/http_sink_tests.rs create mode 100644 docs/src/sinks/http.md diff --git a/crates/deltaforge-config/src/sinks_cfg.rs b/crates/deltaforge-config/src/sinks_cfg.rs index 357a776..9b91158 100644 --- a/crates/deltaforge-config/src/sinks_cfg.rs +++ b/crates/deltaforge-config/src/sinks_cfg.rs @@ -63,6 +63,7 @@ pub enum SinkCfg { Kafka(KafkaSinkCfg), Redis(RedisSinkCfg), Nats(NatsSinkCfg), + Http(HttpSinkCfg), } impl SinkCfg { @@ -72,6 +73,7 @@ impl SinkCfg { Self::Kafka(c) => &c.id, Self::Redis(c) => &c.id, Self::Nats(c) => &c.id, + Self::Http(c) => &c.id, } } } @@ -332,6 +334,66 @@ pub struct NatsSinkCfg { pub filter: Option, } +/// HTTP/Webhook sink configuration. +/// +/// Delivers events via HTTP POST (or PUT) to any URL. Supports dynamic URL +/// templates, custom headers with env var expansion, and optional batch mode. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpSinkCfg { + /// Unique identifier for this sink. + pub id: String, + + /// Target URL. Supports `${path}` templates for per-event routing. + /// Example: `https://api.example.com/cdc/${source.table}` + pub url: String, + + /// HTTP method. Default: POST. + #[serde(default = "default_http_method")] + pub method: String, + + /// Static headers added to every request. Values support `${ENV_VAR}` expansion. + /// Example: `{"Authorization": "Bearer ${API_TOKEN}", "X-Source": "deltaforge"}` + #[serde(default)] + pub headers: std::collections::HashMap, + + /// Batch mode: if true, send a JSON array of events in one request. + /// If false (default), send one request per event. + #[serde(default)] + pub batch_mode: bool, + + /// Envelope format for event serialization. + #[serde(default)] + pub envelope: EnvelopeCfg, + + /// Wire encoding format. + #[serde(default)] + pub encoding: EncodingCfg, + + /// Whether this sink must succeed for checkpoint to proceed. + #[serde(default)] + pub required: Option, + + /// Timeout for individual HTTP requests (seconds). Default: 10. + #[serde(default)] + pub send_timeout_secs: Option, + + /// Timeout for batch operations (seconds). Default: 30. + #[serde(default)] + pub batch_timeout_secs: Option, + + /// Timeout for TCP connection establishment (seconds). Default: 5. + #[serde(default)] + pub connect_timeout_secs: Option, + + /// Optional filter applied before delivery. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filter: Option, +} + +fn default_http_method() -> String { + "POST".to_string() +} + // ============================================================================ // Conversion helpers (config → core types) // ============================================================================ diff --git a/crates/sinks/Cargo.toml b/crates/sinks/Cargo.toml index 5194b6d..0c17047 100644 --- a/crates/sinks/Cargo.toml +++ b/crates/sinks/Cargo.toml @@ -14,6 +14,8 @@ metrics.workspace = true rdkafka.workspace = true redis.workspace = true async-nats = "0.38" +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +shellexpand = "3" tokio.workspace = true tokio-util.workspace = true deltaforge-core = { path = "../deltaforge-core" } @@ -21,7 +23,8 @@ deltaforge-config = { path = "../deltaforge-config" } common = { path = "../common" } [dev-dependencies] -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "net"] } +axum = "0.8" ctor = "0.6" chrono.workspace = true uuid.workspace = true diff --git a/crates/sinks/src/http.rs b/crates/sinks/src/http.rs new file mode 100644 index 0000000..7a8d94a --- /dev/null +++ b/crates/sinks/src/http.rs @@ -0,0 +1,511 @@ +//! HTTP/Webhook sink — delivers CDC events via HTTP POST/PUT to any URL. +//! +//! Supports dynamic URL templates, custom headers with env var expansion, +//! batch mode (JSON array), and retry with exponential backoff on 5xx/network errors. + +use std::borrow::Cow; +use std::time::Duration; + +use anyhow::Context; +use async_trait::async_trait; +use common::retry::{RetryOutcome, RetryPolicy, retry_async}; +use common::routing::CompiledTemplate; +use deltaforge_config::HttpSinkCfg; +use deltaforge_core::encoding::EncodingType; +use deltaforge_core::envelope::Envelope; +use deltaforge_core::{BatchResult, Event, Sink, SinkError, SinkResult}; +use metrics::counter; +use serde_json::Value; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, instrument, warn}; + +/// Default timeout for individual HTTP requests. +const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_secs(10); +/// Default timeout for batch operations. +const DEFAULT_BATCH_TIMEOUT: Duration = Duration::from_secs(30); +/// Default timeout for TCP connection establishment. +const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); + +pub struct HttpSink { + id: String, + pipeline: String, + client: reqwest::Client, + url: String, + url_template: CompiledTemplate, + method: reqwest::Method, + headers: reqwest::header::HeaderMap, + batch_mode: bool, + envelope: Box, + encoding: EncodingType, + required: bool, + send_timeout: Duration, + batch_timeout: Duration, + cancel: CancellationToken, +} + +impl HttpSink { + #[instrument(skip_all, fields(sink_id = %cfg.id, url = %cfg.url))] + pub fn new( + cfg: &HttpSinkCfg, + cancel: CancellationToken, + pipeline: &str, + ) -> anyhow::Result { + let connect_timeout = cfg + .connect_timeout_secs + .map(|s| Duration::from_secs(s as u64)) + .unwrap_or(DEFAULT_CONNECT_TIMEOUT); + + let send_timeout = cfg + .send_timeout_secs + .map(|s| Duration::from_secs(s as u64)) + .unwrap_or(DEFAULT_SEND_TIMEOUT); + + let batch_timeout = cfg + .batch_timeout_secs + .map(|s| Duration::from_secs(s as u64)) + .unwrap_or(DEFAULT_BATCH_TIMEOUT); + + // Build reqwest client with connection pooling and timeouts. + let client = reqwest::Client::builder() + .connect_timeout(connect_timeout) + .timeout(send_timeout) + .pool_max_idle_per_host(10) + .build() + .context("failed to build HTTP client")?; + + // Parse URL template. + let url_template = CompiledTemplate::parse(&cfg.url) + .map_err(|e| anyhow::anyhow!("invalid URL template: {e}"))?; + + if !url_template.is_static() { + info!( + id = %cfg.id, + template = %cfg.url, + "URL uses dynamic routing template" + ); + } + + // Parse HTTP method. + let method = cfg.method.parse::().map_err(|e| { + anyhow::anyhow!("invalid HTTP method '{}': {e}", cfg.method) + })?; + + // Build static headers with env var expansion. + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::CONTENT_TYPE, + "application/json".parse().unwrap(), + ); + for (key, value) in &cfg.headers { + // Expand ${ENV_VAR} in header values. + let expanded = shellexpand::env(value) + .unwrap_or(Cow::Borrowed(value)); + let header_name = key + .parse::() + .map_err(|e| anyhow::anyhow!("invalid header name '{key}': {e}"))?; + let header_value = expanded + .parse::() + .map_err(|e| { + anyhow::anyhow!("invalid header value for '{key}': {e}") + })?; + headers.insert(header_name, header_value); + } + + let envelope_type = cfg.envelope.to_envelope_type(); + let encoding_type = cfg.encoding.to_encoding_type(); + + info!( + url = %cfg.url, + method = %cfg.method, + batch_mode = cfg.batch_mode, + headers = cfg.headers.len(), + envelope = %envelope_type.name(), + encoding = encoding_type.name(), + send_timeout_ms = send_timeout.as_millis(), + "http sink created" + ); + + Ok(Self { + id: cfg.id.clone(), + pipeline: pipeline.to_string(), + client, + url: cfg.url.clone(), + url_template, + method, + headers, + batch_mode: cfg.batch_mode, + envelope: envelope_type.build(), + encoding: encoding_type, + required: cfg.required.unwrap_or(true), + send_timeout, + batch_timeout, + cancel, + }) + } + + fn resolve_url(&self, event: &Event) -> SinkResult { + // Check event routing override. + if let Some(t) = + event.routing.as_ref().and_then(|r| r.effective_topic()) + { + return Ok(t.to_string()); + } + // Fast path: static URL. + if self.url_template.is_static() { + return Ok(self.url.clone()); + } + // Resolve template against event JSON. + let event_json = serde_json::to_value(event).map_err(|e| { + SinkError::Serialization { + details: e.to_string().into(), + } + })?; + self.url_template + .resolve_strict(&event_json) + .map_err(|e| SinkError::Routing { + details: e.to_string().into(), + }) + } + + fn serialize_event(&self, event: &Event) -> SinkResult> { + // Outbox raw mode: write event.after directly. + if event.routing.as_ref().is_some_and(|r| r.raw_payload) { + return serde_json::to_vec( + event.after.as_ref().unwrap_or(&Value::Null), + ) + .map_err(Into::into); + } + // Wrap with envelope and encode. + let envelope = self.envelope.wrap(event).map_err(|e| { + SinkError::Serialization { + details: e.to_string().into(), + } + })?; + self.encoding + .encode(&envelope) + .map(|b| b.to_vec()) + .map_err(|e| SinkError::Serialization { + details: e.to_string().into(), + }) + } + + /// Send a single HTTP request with the given body to the given URL. + async fn do_send( + &self, + url: &str, + body: Vec, + ) -> Result<(), HttpRetryError> { + let resp = self + .client + .request(self.method.clone(), url) + .headers(self.headers.clone()) + .body(body) + .send() + .await + .map_err(|e| { + if e.is_connect() { + HttpRetryError::Connect(e.to_string()) + } else if e.is_timeout() { + HttpRetryError::Timeout + } else { + HttpRetryError::Network(e.to_string()) + } + })?; + + let status = resp.status(); + if status.is_success() { + return Ok(()); + } + + let body = resp.text().await.unwrap_or_default(); + let msg = format!("{status}: {body}"); + + match status.as_u16() { + 401 | 403 => Err(HttpRetryError::Auth(msg)), + 408 | 429 => Err(HttpRetryError::Retryable(msg)), + s if s >= 500 => Err(HttpRetryError::Retryable(msg)), + _ => Err(HttpRetryError::Permanent(msg)), + } + } +} + +#[async_trait] +impl Sink for HttpSink { + fn id(&self) -> &str { + &self.id + } + + fn required(&self) -> bool { + self.required + } + + #[instrument(skip_all, fields(sink_id = %self.id))] + async fn send(&self, event: &Event) -> SinkResult<()> { + let payload = self.serialize_event(event)?; + let url = self.resolve_url(event)?; + + let policy = RetryPolicy::new( + Duration::from_millis(100), + Duration::from_secs(10), + 0.2, + Some(3), + ); + + let result = retry_async( + |attempt| { + let url = url.clone(); + let payload = payload.clone(); + async move { + debug!(attempt, url = %url, "sending event via HTTP"); + self.do_send(&url, payload).await + } + }, + |e| e.is_retryable(), + self.send_timeout, + policy, + &self.cancel, + "http_send", + ) + .await; + + match result { + Ok(_) => { + counter!( + "deltaforge_sink_bytes_total", + "pipeline" => self.pipeline.clone(), + "sink" => self.id.clone(), + ) + .increment(payload.len() as u64); + debug!("event sent via HTTP"); + Ok(()) + } + Err(outcome) => Err(outcome_to_sink_error(outcome)), + } + } + + #[instrument(skip_all, fields(sink_id = %self.id, count = events.len()))] + async fn send_batch(&self, events: &[Event]) -> SinkResult { + if events.is_empty() { + return Ok(BatchResult::ok()); + } + + // Pre-serialize with resolved URLs. + let mut serialized: Vec<(String, Vec)> = + Vec::with_capacity(events.len()); + let mut dlq_failures: Vec<(usize, SinkError)> = Vec::new(); + + for (i, e) in events.iter().enumerate() { + match (|| -> Result<_, SinkError> { + let url = self.resolve_url(e)?; + let payload = self.serialize_event(e)?; + Ok((url, payload)) + })() { + Ok(prepared) => serialized.push(prepared), + Err(e) if e.is_dlq_eligible() => { + counter!( + "deltaforge_sink_routing_errors_total", + "pipeline" => self.pipeline.clone(), + "sink" => self.id.clone(), + "kind" => e.kind(), + ) + .increment(1); + dlq_failures.push((i, e)); + } + Err(e) => { + counter!( + "deltaforge_sink_routing_errors_total", + "pipeline" => self.pipeline.clone(), + "sink" => self.id.clone(), + "kind" => e.kind(), + ) + .increment(1); + return Err(e); + } + } + } + + if serialized.is_empty() { + return Ok(BatchResult { dlq_failures }); + } + + if self.batch_mode { + // Batch mode: combine all payloads into a JSON array. + let url = &serialized[0].0; + let array_body = { + let items: Vec = serialized + .iter() + .filter_map(|(_, payload)| { + serde_json::from_slice(payload).ok() + }) + .collect(); + serde_json::to_vec(&items).unwrap_or_default() + }; + + let policy = RetryPolicy::new( + Duration::from_millis(200), + Duration::from_secs(15), + 0.2, + Some(3), + ); + + let url = url.clone(); + let event_count = serialized.len(); + let result = retry_async( + |attempt| { + let url = url.clone(); + let body = array_body.clone(); + async move { + debug!(attempt, count = event_count, "sending batch via HTTP"); + self.do_send(&url, body).await + } + }, + |e| e.is_retryable(), + self.batch_timeout, + policy, + &self.cancel, + "http_batch", + ) + .await; + + match result { + Ok(_) => { + let total_bytes: u64 = + serialized.iter().map(|(_, p)| p.len() as u64).sum(); + counter!( + "deltaforge_sink_bytes_total", + "pipeline" => self.pipeline.clone(), + "sink" => self.id.clone(), + ) + .increment(total_bytes); + debug!( + count = events.len(), + dlq_failures = dlq_failures.len(), + "batch sent via HTTP" + ); + Ok(BatchResult { dlq_failures }) + } + Err(outcome) => Err(outcome_to_sink_error(outcome)), + } + } else { + // Per-event mode: send each event individually. + let policy = RetryPolicy::new( + Duration::from_millis(100), + Duration::from_secs(10), + 0.2, + Some(3), + ); + + for (url, payload) in &serialized { + let url = url.clone(); + let payload = payload.clone(); + let result = retry_async( + |attempt| { + let url = url.clone(); + let payload = payload.clone(); + async move { + debug!(attempt, "sending event via HTTP"); + self.do_send(&url, payload).await + } + }, + |e| e.is_retryable(), + self.send_timeout, + policy.clone(), + &self.cancel, + "http_send", + ) + .await; + + if let Err(outcome) = result { + return Err(outcome_to_sink_error(outcome)); + } + } + + let total_bytes: u64 = + serialized.iter().map(|(_, p)| p.len() as u64).sum(); + counter!( + "deltaforge_sink_bytes_total", + "pipeline" => self.pipeline.clone(), + "sink" => self.id.clone(), + ) + .increment(total_bytes); + debug!( + count = events.len(), + dlq_failures = dlq_failures.len(), + "batch sent via HTTP (per-event)" + ); + Ok(BatchResult { dlq_failures }) + } + } +} + +// ============================================================================= +// Error handling +// ============================================================================= + +#[derive(Debug, Clone)] +enum HttpRetryError { + Connect(String), + Timeout, + Network(String), + Auth(String), + Retryable(String), + Permanent(String), +} + +impl HttpRetryError { + fn is_retryable(&self) -> bool { + matches!( + self, + Self::Connect(_) | Self::Timeout | Self::Network(_) | Self::Retryable(_) + ) + } +} + +impl std::fmt::Display for HttpRetryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Connect(msg) => write!(f, "connection error: {msg}"), + Self::Timeout => write!(f, "request timed out"), + Self::Network(msg) => write!(f, "network error: {msg}"), + Self::Auth(msg) => write!(f, "auth error: {msg}"), + Self::Retryable(msg) => write!(f, "retryable error: {msg}"), + Self::Permanent(msg) => write!(f, "permanent error: {msg}"), + } + } +} + +fn outcome_to_sink_error(outcome: RetryOutcome) -> SinkError { + match outcome { + RetryOutcome::Cancelled => { + SinkError::Other(anyhow::anyhow!("operation cancelled")) + } + RetryOutcome::Timeout { action } => SinkError::Backpressure { + details: format!("timeout: {action}").into(), + }, + RetryOutcome::Exhausted { + attempts, + last_error, + } => match &last_error { + HttpRetryError::Auth(msg) => SinkError::Auth { + details: msg.clone().into(), + }, + _ => SinkError::Connect { + details: format!( + "exhausted after {attempts} attempts: {last_error}" + ) + .into(), + }, + }, + RetryOutcome::Failed(e) => match &e { + HttpRetryError::Auth(msg) => SinkError::Auth { + details: msg.clone().into(), + }, + HttpRetryError::Permanent(msg) => SinkError::Connect { + details: msg.clone().into(), + }, + _ => SinkError::Connect { + details: e.to_string().into(), + }, + }, + } +} diff --git a/crates/sinks/src/lib.rs b/crates/sinks/src/lib.rs index 8907a39..5362fbb 100644 --- a/crates/sinks/src/lib.rs +++ b/crates/sinks/src/lib.rs @@ -39,10 +39,12 @@ use deltaforge_core::ArcDynSink; use tokio_util::sync::CancellationToken; pub mod filter; +pub mod http; pub mod kafka; pub mod nats; pub mod redis; pub use filter::FilteredSink; +pub use http::HttpSink; pub use kafka::KafkaSink; pub use nats::NatsSink; pub use redis::RedisSink; @@ -114,6 +116,11 @@ pub fn build_sinks( as ArcDynSink, cfg.filter.clone(), ), + SinkCfg::Http(cfg) => ( + Arc::new(HttpSink::new(cfg, cancel.clone(), pipeline)?) + as ArcDynSink, + cfg.filter.clone(), + ), }; // Only wrap when filter has actual conditions — zero overhead otherwise let sink = match filter { @@ -151,6 +158,9 @@ pub fn build_sink( Arc::new(NatsSink::new(nats_sink_cfg, cancel, pipeline)?) as ArcDynSink } + SinkCfg::Http(http_cfg) => { + Arc::new(HttpSink::new(http_cfg, cancel, pipeline)?) as ArcDynSink + } }; Ok(sink) } diff --git a/crates/sinks/tests/http_sink_tests.rs b/crates/sinks/tests/http_sink_tests.rs new file mode 100644 index 0000000..1fbea4a --- /dev/null +++ b/crates/sinks/tests/http_sink_tests.rs @@ -0,0 +1,346 @@ +//! Integration tests for HTTP sink. +//! +//! These tests spin up a local HTTP server (axum) and verify that the HTTP +//! sink delivers events correctly. No external dependencies required. +//! +//! Run with: +//! ```bash +//! cargo test -p sinks --test http_sink_tests -- --nocapture +//! ``` + +use std::collections::HashMap; +use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; + +use anyhow::Result; +use axum::{Json, Router, extract::State, routing::post}; +use deltaforge_config::{EncodingCfg, EnvelopeCfg, HttpSinkCfg}; +use deltaforge_core::{Event, Sink}; +use sinks::http::HttpSink; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; + +mod sink_test_common; +use sink_test_common::{init_test_tracing, make_test_event, make_event_for_table}; + +// ============================================================================= +// Test HTTP Server +// ============================================================================= + +#[derive(Clone)] +struct TestServer { + received: Arc>>, + request_count: Arc, +} + +impl TestServer { + fn new() -> Self { + Self { + received: Arc::new(Mutex::new(Vec::new())), + request_count: Arc::new(AtomicUsize::new(0)), + } + } + + async fn received_events(&self) -> Vec { + self.received.lock().await.clone() + } + + fn request_count(&self) -> usize { + self.request_count.load(Ordering::Relaxed) + } +} + +async fn handle_event( + State(state): State, + Json(body): Json, +) -> axum::http::StatusCode { + state.request_count.fetch_add(1, Ordering::Relaxed); + let mut received = state.received.lock().await; + if body.is_array() { + // Batch mode: each element is an event + if let Some(arr) = body.as_array() { + for item in arr { + received.push(item.clone()); + } + } + } else { + received.push(body); + } + axum::http::StatusCode::OK +} + +async fn handle_401( + State(state): State, +) -> axum::http::StatusCode { + state.request_count.fetch_add(1, Ordering::Relaxed); + axum::http::StatusCode::UNAUTHORIZED +} + +async fn handle_500( + State(state): State, +) -> axum::http::StatusCode { + state.request_count.fetch_add(1, Ordering::Relaxed); + axum::http::StatusCode::INTERNAL_SERVER_ERROR +} + +/// Start a test HTTP server on a random port. Returns (port, server_state). +async fn start_test_server( + handler: Router, +) -> (u16, tokio::task::JoinHandle<()>) { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .unwrap(); + let port = listener.local_addr().unwrap().port(); + let handle = tokio::spawn(async move { + axum::serve(listener, handler).await.unwrap(); + }); + // Give the server a moment to bind. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + (port, handle) +} + +fn make_http_cfg(id: &str, url: &str) -> HttpSinkCfg { + HttpSinkCfg { + id: id.into(), + url: url.into(), + method: "POST".into(), + headers: HashMap::new(), + batch_mode: false, + envelope: EnvelopeCfg::Native, + encoding: EncodingCfg::Json, + required: Some(true), + send_timeout_secs: Some(5), + batch_timeout_secs: Some(10), + connect_timeout_secs: Some(2), + filter: None, + } +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[tokio::test] +async fn http_sink_sends_single_event() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + let app = Router::new() + .route("/events", post(handle_event)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let cfg = make_http_cfg("test-http", &format!("http://127.0.0.1:{port}/events")); + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let event = make_test_event(1); + sink.send(&event).await?; + + let received = server.received_events().await; + assert_eq!(received.len(), 1); + assert_eq!(received[0]["after"]["id"], 1); + assert_eq!(server.request_count(), 1); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_sends_batch_per_event() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + let app = Router::new() + .route("/events", post(handle_event)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let cfg = make_http_cfg("test-batch", &format!("http://127.0.0.1:{port}/events")); + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let events: Vec = (0..5).map(make_test_event).collect(); + let result = sink.send_batch(&events).await?; + + assert!(result.dlq_failures.is_empty()); + let received = server.received_events().await; + assert_eq!(received.len(), 5); + // 5 separate requests (batch_mode = false) + assert_eq!(server.request_count(), 5); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_batch_mode_sends_array() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + let app = Router::new() + .route("/events", post(handle_event)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let mut cfg = make_http_cfg("test-batch-mode", &format!("http://127.0.0.1:{port}/events")); + cfg.batch_mode = true; + + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let events: Vec = (0..5).map(make_test_event).collect(); + let result = sink.send_batch(&events).await?; + + assert!(result.dlq_failures.is_empty()); + let received = server.received_events().await; + assert_eq!(received.len(), 5); + // 1 request with JSON array (batch_mode = true) + assert_eq!(server.request_count(), 1); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_auth_error_fails_immediately() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + let app = Router::new() + .route("/events", post(handle_401)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let cfg = make_http_cfg("test-auth", &format!("http://127.0.0.1:{port}/events")); + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let event = make_test_event(1); + let result = sink.send(&event).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + matches!(err, deltaforge_core::SinkError::Auth { .. }), + "expected Auth error, got: {:?}", + err + ); + // Should NOT retry — only 1 request + assert_eq!(server.request_count(), 1); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_5xx_retries() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + let app = Router::new() + .route("/events", post(handle_500)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let cfg = make_http_cfg("test-retry", &format!("http://127.0.0.1:{port}/events")); + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let event = make_test_event(1); + let result = sink.send(&event).await; + + assert!(result.is_err()); + // Should have retried multiple times (3 attempts configured) + assert!( + server.request_count() > 1, + "expected retries, got {} requests", + server.request_count() + ); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_url_template_routing() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + let app = Router::new() + .route("/events/{table}", post(handle_event)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let cfg = make_http_cfg( + "test-routing", + &format!("http://127.0.0.1:{port}/events/${{source.table}}"), + ); + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let event = make_event_for_table(1, "orders"); + sink.send(&event).await?; + + let received = server.received_events().await; + assert_eq!(received.len(), 1); + assert_eq!(received[0]["source"]["table"], "orders"); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_custom_headers() -> Result<()> { + init_test_tracing(); + + let server = TestServer::new(); + + // Server that checks for custom header + async fn check_header( + headers: axum::http::HeaderMap, + State(state): State, + Json(body): Json, + ) -> axum::http::StatusCode { + state.request_count.fetch_add(1, Ordering::Relaxed); + let mut received = state.received.lock().await; + // Store the custom header value alongside the event + let header_val = headers + .get("x-custom") + .map(|v| v.to_str().unwrap_or("")) + .unwrap_or("missing"); + let mut enriched = body; + enriched["_test_header"] = serde_json::json!(header_val); + received.push(enriched); + axum::http::StatusCode::OK + } + + let app = Router::new() + .route("/events", post(check_header)) + .with_state(server.clone()); + let (port, _handle) = start_test_server(app).await; + + let mut cfg = make_http_cfg("test-headers", &format!("http://127.0.0.1:{port}/events")); + cfg.headers.insert("X-Custom".into(), "deltaforge-test".into()); + + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let event = make_test_event(1); + sink.send(&event).await?; + + let received = server.received_events().await; + assert_eq!(received.len(), 1); + assert_eq!(received[0]["_test_header"], "deltaforge-test"); + + Ok(()) +} + +#[tokio::test] +async fn http_sink_connection_refused_retries() -> Result<()> { + init_test_tracing(); + + // No server running on this port + let cfg = make_http_cfg("test-refused", "http://127.0.0.1:19999/events"); + let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; + + let event = make_test_event(1); + let result = sink.send(&event).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + // Should be a connection error after retries + assert!( + matches!(err, deltaforge_core::SinkError::Connect { .. } | deltaforge_core::SinkError::Backpressure { .. }), + "expected Connect or Backpressure error, got: {:?}", + err + ); + + Ok(()) +} diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index b3aff62..d3bec5c 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -15,6 +15,7 @@ - [Redis](sinks/redis.md) - [Kafka](sinks/kafka.md) - [NATS](sinks/nats.md) + - [HTTP/Webhook](sinks/http.md) - [Envelopes and Encodings](envelopes.md) - [Dynamic Routing](routing.md) - [Outbox Pattern Support](outbox.md) diff --git a/docs/src/sinks/README.md b/docs/src/sinks/README.md index afb1db9..5854397 100644 --- a/docs/src/sinks/README.md +++ b/docs/src/sinks/README.md @@ -33,6 +33,7 @@ sinks: | | [`kafka`](kafka.md) | Kafka producer sink | | | [`nats`](nats.md) | NATS JetStream sink | | | [`redis`](redis.md) | Redis stream sink | +| | [`http`](http.md) | HTTP/Webhook sink | ## Multiple sinks in one pipeline @@ -116,6 +117,7 @@ This architecture avoids the common CDC pitfall where the slowest sink becomes a | Kafka (`exactly_once: false`) | At-least-once (idempotent) | Retries deduped; crash-replay produces duplicates | Dedup by event ID | | NATS JetStream | At-least-once + server dedup | `Nats-Msg-Id` header within `duplicate_window` | Configure `duplicate_window` | | Redis Streams | At-least-once + consumer dedup | `idempotency_key` field in XADD payload | Check key before processing | +| HTTP/Webhook | At-least-once | Retry on 5xx/timeout; no server-side dedup | Consumer must be idempotent (use event `id`) | "Exactly-once" means DeltaForge guarantees no duplicates without consumer cooperation. All other sinks are "at-least-once" with a stated dedup mechanism. diff --git a/docs/src/sinks/http.md b/docs/src/sinks/http.md new file mode 100644 index 0000000..940b669 --- /dev/null +++ b/docs/src/sinks/http.md @@ -0,0 +1,178 @@ +# HTTP/Webhook sink + +The HTTP sink delivers CDC events via HTTP POST (or PUT) to any URL — internal services, serverless functions, third-party APIs, webhooks. + +## When to use HTTP + +HTTP is the universal integration point. Use it when: + +- Your consumer doesn't run Kafka/NATS/Redis +- You need to call a REST API or webhook on every database change +- You want the simplest possible setup (DeltaForge + any HTTP server) +- You're integrating with serverless functions (AWS Lambda, Cloud Functions) + +### Pros and cons + +| Pros | Cons | +|------|------| +| Works with any HTTP server | Higher latency than message queues | +| No infrastructure dependencies | No built-in replay (consumer must be idempotent) | +| Simple auth (Bearer, Basic, headers) | No consumer groups or partitioning | +| URL templates for per-table routing | One request per event (unless batch mode) | + +## Configuration + + + + + + +
+ +```yaml +sinks: + - type: http + config: + id: my-webhook + url: "https://api.example.com/events" + method: POST + headers: + Authorization: "Bearer ${API_TOKEN}" + X-Source: deltaforge + batch_mode: false + required: true + send_timeout_secs: 10 +``` + + + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `id` | string | — | Sink identifier | +| `url` | string | — | Target URL (supports `${path}` templates) | +| `method` | string | `POST` | HTTP method (`POST` or `PUT`) | +| `headers` | map | `{}` | Static headers (values support `${ENV_VAR}` expansion) | +| `batch_mode` | bool | `false` | Send JSON array instead of per-event requests | +| `required` | bool | `true` | Gates checkpoints | +| `send_timeout_secs` | int | `10` | Per-request timeout | +| `batch_timeout_secs` | int | `30` | Batch timeout | +| `connect_timeout_secs` | int | `5` | TCP connection timeout | + +
+ +## Authentication + +All auth is handled via the `headers` map. Values support `${ENV_VAR}` shell expansion. + +```yaml +# Bearer token +headers: + Authorization: "Bearer ${API_TOKEN}" + +# Basic auth +headers: + Authorization: "Basic ${BASIC_AUTH_B64}" + +# Custom API key +headers: + X-API-Key: "${MY_API_KEY}" + +# HMAC signature (computed externally, injected via env) +headers: + X-Signature: "${WEBHOOK_SIGNATURE}" +``` + +## URL templates + +Route events to different URLs based on event fields: + +```yaml +# Per-table endpoint +url: "https://api.example.com/cdc/${source.table}" +# → https://api.example.com/cdc/orders +# → https://api.example.com/cdc/customers + +# Per-database endpoint +url: "https://${source.db}.api.internal/events" + +# Static URL (most common) +url: "https://api.example.com/webhook" +``` + +## Batch mode + +By default, the sink sends one HTTP request per event. Enable `batch_mode: true` to send a JSON array of events in a single request: + +```yaml +# Per-event mode (default): one POST per event +batch_mode: false +# Body: {"id": "...", "op": "c", "after": {...}} + +# Batch mode: one POST with JSON array +batch_mode: true +# Body: [{"id": "...", "op": "c", ...}, {"id": "...", "op": "u", ...}] +``` + +Batch mode reduces HTTP overhead but means the consumer must handle arrays. + +## Retry behavior + +| Condition | Behavior | +|-----------|----------| +| 2xx response | Success | +| 408, 429 | Retry with backoff (100ms → 10s, 3 attempts) | +| 5xx | Retry with backoff | +| Connection error | Retry with backoff | +| Timeout | Retry with backoff | +| 401, 403 | Auth error — fail immediately, no retry | +| Other 4xx | Permanent failure — fail batch | + +## Failure modes + +| Failure | Symptoms | DeltaForge behavior | Resolution | +|---------|----------|---------------------|------------| +| **Endpoint unavailable** | Connection refused | Retries with backoff; blocks checkpoint | Restore endpoint | +| **Authentication failure** | 401/403 response | Fails fast, no retry | Fix credentials in headers | +| **Rate limited** | 429 response | Retries with backoff | Reduce throughput or increase rate limit | +| **Timeout** | Request exceeds `send_timeout_secs` | Retries | Increase timeout or fix slow endpoint | +| **URL template error** | Template resolves to empty | Event → DLQ (if enabled) | Fix template or event data | + +## Consuming events + +### Node.js / Express + +```javascript +app.post('/webhook', (req, res) => { + const event = req.body; + console.log(`${event.op} on ${event.source.table}: ${JSON.stringify(event.after)}`); + res.sendStatus(200); +}); +``` + +### Python / Flask + +```python +@app.route('/webhook', methods=['POST']) +def webhook(): + event = request.json + process(event) + return '', 200 +``` + +### Go + +```go +http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) { + var event Event + json.NewDecoder(r.Body).Decode(&event) + process(event) + w.WriteHeader(http.StatusOK) +}) +``` + +## Notes + +- Connection pooling is automatic — `reqwest` reuses TCP connections to the same host +- The `Content-Type` header is set to `application/json` by default +- At-least-once delivery: on crash, events may be re-sent. Consumers should be idempotent. +- For per-event dedup, use the `id` field in the event payload (UUID v7, stable across replays) From d847dbf625669965ac8fef04c1eb151979835000 Mon Sep 17 00:00:00 2001 From: vahid Date: Fri, 3 Apr 2026 01:36:27 +0300 Subject: [PATCH 2/4] clippy fixes --- Cargo.lock | 72 ++++++++++++++++++++++++++ crates/sinks/tests/sink_test_common.rs | 1 + 2 files changed, 73 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 4d1a79d..1740b1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2186,9 +2186,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasi 0.14.5+wasi-0.2.4", + "wasm-bindgen", ] [[package]] @@ -2534,6 +2536,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.2", "tower-service", + "webpki-roots 1.0.2", ] [[package]] @@ -3225,6 +3228,12 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "matchers" version = "0.2.0" @@ -4372,6 +4381,61 @@ dependencies = [ "memchr", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.1.1", + "rustls 0.23.31", + "socket2 0.5.10", + "thiserror 2.0.16", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes", + "getrandom 0.3.3", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash 2.1.1", + "rustls 0.23.31", + "rustls-pki-types", + "slab", + "thiserror 2.0.16", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2 0.5.10", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.40" @@ -4655,6 +4719,8 @@ dependencies = [ "native-tls", "percent-encoding", "pin-project-lite", + "quinn", + "rustls 0.23.31", "rustls-pki-types", "serde", "serde_json", @@ -4662,6 +4728,7 @@ dependencies = [ "sync_wrapper 1.0.2", "tokio", "tokio-native-tls", + "tokio-rustls 0.26.2", "tower 0.5.2", "tower-http 0.6.8", "tower-service", @@ -4669,6 +4736,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots 1.0.2", ] [[package]] @@ -4927,6 +4995,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ + "web-time", "zeroize", ] @@ -5436,6 +5505,7 @@ dependencies = [ "anyhow", "async-nats", "async-trait", + "axum 0.8.4", "chrono", "common", "ctor", @@ -5445,7 +5515,9 @@ dependencies = [ "metrics", "rdkafka", "redis", + "reqwest", "serde_json", + "shellexpand", "testcontainers", "tokio", "tokio-util", diff --git a/crates/sinks/tests/sink_test_common.rs b/crates/sinks/tests/sink_test_common.rs index d804c8a..5df9eda 100644 --- a/crates/sinks/tests/sink_test_common.rs +++ b/crates/sinks/tests/sink_test_common.rs @@ -45,6 +45,7 @@ pub fn make_test_event(id: i64) -> Event { } /// Create a test event with specific data size. +#[allow(dead_code)] pub fn make_large_event(id: i64, size_bytes: usize) -> Event { let padding = "x".repeat(size_bytes); Event::new_row( From 0d466f4d6486f5cc6400e0c4f1685cf06a5fbf49 Mon Sep 17 00:00:00 2001 From: vahid Date: Fri, 3 Apr 2026 01:36:54 +0300 Subject: [PATCH 3/4] formatting fixes --- crates/sinks/src/http.rs | 30 ++++++++++------ crates/sinks/tests/http_sink_tests.rs | 52 +++++++++++++++++---------- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/crates/sinks/src/http.rs b/crates/sinks/src/http.rs index 7a8d94a..1a3b3f1 100644 --- a/crates/sinks/src/http.rs +++ b/crates/sinks/src/http.rs @@ -98,11 +98,12 @@ impl HttpSink { ); for (key, value) in &cfg.headers { // Expand ${ENV_VAR} in header values. - let expanded = shellexpand::env(value) - .unwrap_or(Cow::Borrowed(value)); - let header_name = key - .parse::() - .map_err(|e| anyhow::anyhow!("invalid header name '{key}': {e}"))?; + let expanded = + shellexpand::env(value).unwrap_or(Cow::Borrowed(value)); + let header_name = + key.parse::().map_err(|e| { + anyhow::anyhow!("invalid header name '{key}': {e}") + })?; let header_value = expanded .parse::() .map_err(|e| { @@ -160,11 +161,11 @@ impl HttpSink { details: e.to_string().into(), } })?; - self.url_template - .resolve_strict(&event_json) - .map_err(|e| SinkError::Routing { + self.url_template.resolve_strict(&event_json).map_err(|e| { + SinkError::Routing { details: e.to_string().into(), - }) + } + }) } fn serialize_event(&self, event: &Event) -> SinkResult> { @@ -355,7 +356,11 @@ impl Sink for HttpSink { let url = url.clone(); let body = array_body.clone(); async move { - debug!(attempt, count = event_count, "sending batch via HTTP"); + debug!( + attempt, + count = event_count, + "sending batch via HTTP" + ); self.do_send(&url, body).await } }, @@ -456,7 +461,10 @@ impl HttpRetryError { fn is_retryable(&self) -> bool { matches!( self, - Self::Connect(_) | Self::Timeout | Self::Network(_) | Self::Retryable(_) + Self::Connect(_) + | Self::Timeout + | Self::Network(_) + | Self::Retryable(_) ) } } diff --git a/crates/sinks/tests/http_sink_tests.rs b/crates/sinks/tests/http_sink_tests.rs index 1fbea4a..584fcd0 100644 --- a/crates/sinks/tests/http_sink_tests.rs +++ b/crates/sinks/tests/http_sink_tests.rs @@ -9,7 +9,10 @@ //! ``` use std::collections::HashMap; -use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; use anyhow::Result; use axum::{Json, Router, extract::State, routing::post}; @@ -20,7 +23,9 @@ use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; mod sink_test_common; -use sink_test_common::{init_test_tracing, make_test_event, make_event_for_table}; +use sink_test_common::{ + init_test_tracing, make_event_for_table, make_test_event, +}; // ============================================================================= // Test HTTP Server @@ -68,16 +73,12 @@ async fn handle_event( axum::http::StatusCode::OK } -async fn handle_401( - State(state): State, -) -> axum::http::StatusCode { +async fn handle_401(State(state): State) -> axum::http::StatusCode { state.request_count.fetch_add(1, Ordering::Relaxed); axum::http::StatusCode::UNAUTHORIZED } -async fn handle_500( - State(state): State, -) -> axum::http::StatusCode { +async fn handle_500(State(state): State) -> axum::http::StatusCode { state.request_count.fetch_add(1, Ordering::Relaxed); axum::http::StatusCode::INTERNAL_SERVER_ERROR } @@ -86,9 +87,7 @@ async fn handle_500( async fn start_test_server( handler: Router, ) -> (u16, tokio::task::JoinHandle<()>) { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0") - .await - .unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); let handle = tokio::spawn(async move { axum::serve(listener, handler).await.unwrap(); @@ -129,7 +128,8 @@ async fn http_sink_sends_single_event() -> Result<()> { .with_state(server.clone()); let (port, _handle) = start_test_server(app).await; - let cfg = make_http_cfg("test-http", &format!("http://127.0.0.1:{port}/events")); + let cfg = + make_http_cfg("test-http", &format!("http://127.0.0.1:{port}/events")); let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; let event = make_test_event(1); @@ -153,7 +153,8 @@ async fn http_sink_sends_batch_per_event() -> Result<()> { .with_state(server.clone()); let (port, _handle) = start_test_server(app).await; - let cfg = make_http_cfg("test-batch", &format!("http://127.0.0.1:{port}/events")); + let cfg = + make_http_cfg("test-batch", &format!("http://127.0.0.1:{port}/events")); let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; let events: Vec = (0..5).map(make_test_event).collect(); @@ -178,7 +179,10 @@ async fn http_sink_batch_mode_sends_array() -> Result<()> { .with_state(server.clone()); let (port, _handle) = start_test_server(app).await; - let mut cfg = make_http_cfg("test-batch-mode", &format!("http://127.0.0.1:{port}/events")); + let mut cfg = make_http_cfg( + "test-batch-mode", + &format!("http://127.0.0.1:{port}/events"), + ); cfg.batch_mode = true; let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; @@ -205,7 +209,8 @@ async fn http_sink_auth_error_fails_immediately() -> Result<()> { .with_state(server.clone()); let (port, _handle) = start_test_server(app).await; - let cfg = make_http_cfg("test-auth", &format!("http://127.0.0.1:{port}/events")); + let cfg = + make_http_cfg("test-auth", &format!("http://127.0.0.1:{port}/events")); let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; let event = make_test_event(1); @@ -234,7 +239,8 @@ async fn http_sink_5xx_retries() -> Result<()> { .with_state(server.clone()); let (port, _handle) = start_test_server(app).await; - let cfg = make_http_cfg("test-retry", &format!("http://127.0.0.1:{port}/events")); + let cfg = + make_http_cfg("test-retry", &format!("http://127.0.0.1:{port}/events")); let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; let event = make_test_event(1); @@ -307,8 +313,12 @@ async fn http_sink_custom_headers() -> Result<()> { .with_state(server.clone()); let (port, _handle) = start_test_server(app).await; - let mut cfg = make_http_cfg("test-headers", &format!("http://127.0.0.1:{port}/events")); - cfg.headers.insert("X-Custom".into(), "deltaforge-test".into()); + let mut cfg = make_http_cfg( + "test-headers", + &format!("http://127.0.0.1:{port}/events"), + ); + cfg.headers + .insert("X-Custom".into(), "deltaforge-test".into()); let sink = HttpSink::new(&cfg, CancellationToken::new(), "test")?; @@ -337,7 +347,11 @@ async fn http_sink_connection_refused_retries() -> Result<()> { let err = result.unwrap_err(); // Should be a connection error after retries assert!( - matches!(err, deltaforge_core::SinkError::Connect { .. } | deltaforge_core::SinkError::Backpressure { .. }), + matches!( + err, + deltaforge_core::SinkError::Connect { .. } + | deltaforge_core::SinkError::Backpressure { .. } + ), "expected Connect or Backpressure error, got: {:?}", err ); From d2e1c3c2c2df4ffc57558c9d22754c42ac28881b Mon Sep 17 00:00:00 2001 From: vahid Date: Fri, 3 Apr 2026 01:41:40 +0300 Subject: [PATCH 4/4] docs: add HTTP/Webhook sink to README and landing page --- README.md | 3 ++- docs/index.html | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 036d225..ed723a4 100644 --- a/README.md +++ b/README.md @@ -167,7 +167,8 @@ Output: `{"schema":null,"payload":{...}}` - Kafka producer sink (via `rdkafka`) — end-to-end exactly-once via transactional producer - Redis stream sink — idempotency keys for consumer-side dedup - NATS JetStream sink (via `async_nats`) — server-side dedup via `Nats-Msg-Id` - - Dynamic routing: per-event topic/stream/subject via templates or JavaScript + - HTTP/Webhook sink — POST/PUT to any URL with custom headers, URL templates, batch mode + - Dynamic routing: per-event topic/stream/subject/URL via templates or JavaScript - Configurable envelope formats: Native, Debezium, CloudEvents - JSON wire encoding (Avro planned and more to come) diff --git a/docs/index.html b/docs/index.html index d68bb47..d1f1554 100644 --- a/docs/index.html +++ b/docs/index.html @@ -4,8 +4,8 @@ DeltaForge - High-Performance CDC Engine - - + + @@ -922,7 +922,7 @@
Multi-Sink Fan-Out
-
Deliver concurrently to Kafka, Redis, and NATS in a single pipeline. Per-sink independent checkpoints — each sink advances at its own pace. A slow Redis cache won't hold back your Kafka stream.
+
Deliver concurrently to Kafka, Redis, NATS, and HTTP/Webhooks in a single pipeline. Per-sink independent checkpoints — each sink advances at its own pace. A slow Redis cache won't hold back your Kafka stream.