Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ Output: `{"schema":null,"payload":{...}}`
- Kafka producer sink (via `rdkafka`) — end-to-end exactly-once via transactional producer
- Redis stream sink — idempotency keys for consumer-side dedup
- NATS JetStream sink (via `async_nats`) — server-side dedup via `Nats-Msg-Id`
- Dynamic routing: per-event topic/stream/subject via templates or JavaScript
- HTTP/Webhook sink — POST/PUT to any URL with custom headers, URL templates, batch mode
- Dynamic routing: per-event topic/stream/subject/URL via templates or JavaScript
- Configurable envelope formats: Native, Debezium, CloudEvents
- JSON wire encoding (Avro planned and more to come)

Expand Down
62 changes: 62 additions & 0 deletions crates/deltaforge-config/src/sinks_cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub enum SinkCfg {
Kafka(KafkaSinkCfg),
Redis(RedisSinkCfg),
Nats(NatsSinkCfg),
Http(HttpSinkCfg),
}

impl SinkCfg {
Expand All @@ -72,6 +73,7 @@ impl SinkCfg {
Self::Kafka(c) => &c.id,
Self::Redis(c) => &c.id,
Self::Nats(c) => &c.id,
Self::Http(c) => &c.id,
}
}
}
Expand Down Expand Up @@ -332,6 +334,66 @@ pub struct NatsSinkCfg {
pub filter: Option<SinkFilter>,
}

/// HTTP/Webhook sink configuration.
///
/// Delivers events via HTTP POST (or PUT) to any URL. Supports dynamic URL
/// templates, custom headers with env var expansion, and optional batch mode.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpSinkCfg {
/// Unique identifier for this sink.
pub id: String,

/// Target URL. Supports `${path}` templates for per-event routing.
/// Example: `https://api.example.com/cdc/${source.table}`
pub url: String,

/// HTTP method. Default: POST.
#[serde(default = "default_http_method")]
pub method: String,

/// Static headers added to every request. Values support `${ENV_VAR}` expansion.
/// Example: `{"Authorization": "Bearer ${API_TOKEN}", "X-Source": "deltaforge"}`
#[serde(default)]
pub headers: std::collections::HashMap<String, String>,

/// Batch mode: if true, send a JSON array of events in one request.
/// If false (default), send one request per event.
#[serde(default)]
pub batch_mode: bool,

/// Envelope format for event serialization.
#[serde(default)]
pub envelope: EnvelopeCfg,

/// Wire encoding format.
#[serde(default)]
pub encoding: EncodingCfg,

/// Whether this sink must succeed for checkpoint to proceed.
#[serde(default)]
pub required: Option<bool>,

/// Timeout for individual HTTP requests (seconds). Default: 10.
#[serde(default)]
pub send_timeout_secs: Option<u32>,

/// Timeout for batch operations (seconds). Default: 30.
#[serde(default)]
pub batch_timeout_secs: Option<u32>,

/// Timeout for TCP connection establishment (seconds). Default: 5.
#[serde(default)]
pub connect_timeout_secs: Option<u32>,

/// Optional filter applied before delivery.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filter: Option<SinkFilter>,
}

fn default_http_method() -> String {
"POST".to_string()
}

// ============================================================================
// Conversion helpers (config → core types)
// ============================================================================
Expand Down
5 changes: 4 additions & 1 deletion crates/sinks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ metrics.workspace = true
rdkafka.workspace = true
redis.workspace = true
async-nats = "0.38"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
shellexpand = "3"
tokio.workspace = true
tokio-util.workspace = true
deltaforge-core = { path = "../deltaforge-core" }
deltaforge-config = { path = "../deltaforge-config" }
common = { path = "../common" }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "net"] }
axum = "0.8"
ctor = "0.6"
chrono.workspace = true
uuid.workspace = true
Expand Down
Loading
Loading