Skip to content

vnvo/deltaforge

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

438 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DeltaForge

CI Release Docs GHCR Docker Pulls Arch Coverage Status MSRV License

A versatile, high-performance Change Data Capture (CDC) engine built in Rust.

⚠️ Status: Active development. APIs, configuration, and semantics may change.

DeltaForge streams database changes into downstream systems like Kafka, Redis, and NATS - giving you full control over routing, transformation, and delivery. Built-in schema discovery automatically infers and tracks the shape of your data as it flows through, including deep inspection of nested JSON structures.

DeltaForge is not a DAG based stream processor. It is a focused CDC engine meant to replace tools like Debezium when you need a lighter, cloud-native, and more customizable runtime.

Quick Start

Get DeltaForge running in under 3 minutes:

Minimal Pipeline Config

# pipeline.yaml
apiVersion: deltaforge/v1
kind: Pipeline
metadata:
  name: my-first-pipeline
  tenant: demo

spec:
  source:
    type: mysql
    config:
      id: mysql-src
      dsn: ${MYSQL_DSN}
      tables: [mydb.users]

  processors: []

  sinks:
    - type: kafka
      config:
        id: kafka-sink
        brokers: ${KAFKA_BROKERS}
        topic: users.cdc

Run it with Docker

docker run --rm \
  -e MYSQL_DSN="mysql://user:pass@host:3306/mydb" \
  -e KAFKA_BROKERS="kafka:9092" \
  -v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

Deploy on Kubernetes

helm install deltaforge ./deploy/helm/deltaforge \
  --set secrets.create=true \
  --set secrets.data.MYSQL_USER=cdc_user \
  --set secrets.data.MYSQL_PASSWORD=s3cret

See the Helm chart README for full configuration.

That's it! DeltaForge streams changes from mydb.users to Kafka.

Want Debezium-compatible output?

sinks:
  - type: kafka
    config:
      id: kafka-sink
      brokers: ${KAFKA_BROKERS}
      topic: users.cdc
      envelope:
        type: debezium

Output: {"schema":null,"payload":{...}}

📘 Full docs · Configuration reference

The Tech

Built with Sources Processors Sinks Encodings Output Formats
Rust MySQL · PostgreSQL JavaScript · Outbox · Flatten · Filter Kafka · Redis · NATS · HTTP JSON · Avro Native · Debezium · CloudEvents

Features

  • Sources

    • MySQL binlog CDC with GTID support
    • PostgreSQL logical replication via pgoutput
    • Initial snapshot/backfill for existing tables (MySQL and PostgreSQL)
      • resumes at table granularity after interruption, with binlog/WAL retention validation and background guards
    • Automatic failover handling: server identity detection, checkpoint reachability verification, schema drift reconciliation, and configurable halt-on-drift policy
  • Schema Registry

    • Source-owned schema types (source native semantics)
    • Schema change detection and versioning
    • SHA-256 fingerprinting for stable change detection
  • Schema Sensing

    • Automatic schema inference from JSON event payloads
    • Deep inspection for nested JSON structures
    • High-cardinality key detection (session IDs, trace IDs, dynamic maps)
    • Configurable sampling with warmup and cache optimization
    • Drift detection comparing DB schema vs observed data
    • JSON Schema export for downstream consumers
  • Checkpoints

    • Per-sink independent checkpoints — fastest sink never waits for slowest
    • Pluggable backends (File, SQLite with versioning, in-memory)
    • Configurable commit policies (all, required, quorum)
    • Transaction boundary preservation (respect_source_tx: true by default)
  • Dead Letter Queue

    • Poison events (serialization/routing failures) routed to DLQ instead of blocking pipeline
    • REST API: inspect, filter by sink/error, ack, purge
    • Overflow policies: drop_oldest, reject, block
    • Built on existing storage backend — no additional infrastructure
  • Processors

    • JavaScript processor using deno_core:
      • Run user defined functions (UDFs) in JS to transform batches of events
    • Outbox processor:
      • Transactional outbox pattern with routing and raw payload delivery support
    • Flatten processor:
      • Native Rust processor that collapses nested JSON into top-level parent__child keys
    • Filter processor:
      • Native Rust processor for dropping events by op type, table pattern, or field predicates (eq, ne, gt, in, regex, changed, and more)
  • Sinks

    • Kafka producer sink (via rdkafka) — end-to-end exactly-once via transactional producer
    • Redis stream sink — idempotency keys for consumer-side dedup
    • NATS JetStream sink (via async_nats) — server-side dedup via Nats-Msg-Id
    • HTTP/Webhook sink — POST/PUT to any URL with custom headers, URL templates, batch mode
    • Dynamic routing: per-event topic/stream/subject/URL via templates or JavaScript
    • Configurable envelope formats: Native, Debezium, CloudEvents
    • JSON and Avro wire encoding (Avro with Confluent Schema Registry, DDL-derived type-accurate schemas)

Event Output Formats

DeltaForge supports multiple envelope formats for ecosystem compatibility:

Format Output Use Case
native {"op":"c","after":{...},"source":{...}} Lowest overhead, DeltaForge consumers
debezium {"schema":null,"payload":{...}} Drop-in Debezium replacement
cloudevents {"specversion":"1.0","type":"...","data":{...}} CNCF-standard, event-driven systems

🔄 Debezium Compatibility: DeltaForge uses Debezium's schemaless mode (schema: null), which matches Debezium's JsonConverter with schemas.enable=false - the recommended configuration for most Kafka deployments. This provides wire compatibility with existing Debezium consumers without the overhead of inline schemas (~500+ bytes per message).

💡 Migrating from Debezium? If your consumers already use schemas.enable=false, configure envelope: { type: debezium } on your sinks for drop-in compatibility. For consumers expecting Avro with Schema Registry, configure encoding: { type: avro, schema_registry_url: "http://sr:8081" } — DeltaForge produces the standard Confluent wire format.

See Envelope Formats for detailed examples and wire format specifications.

Documentation

Local development helper

Use the bundled dev.sh CLI to spin up the dependency stack and run common workflows consistently:

./dev.sh up     # start Postgres, MySQL, Kafka, Redis, NATS from docker-compose.dev.yml
./dev.sh ps     # view container status
./dev.sh check  # fmt --check + clippy + tests (matches CI)

See the Development guide for the full layout and additional info.

Container image

Pre-built multi-arch images (amd64/arm64) are available:

# From GitHub Container Registry
docker pull ghcr.io/vnvo/deltaforge:latest

# From Docker Hub
docker pull vnvohub/deltaforge:latest

# Debug variant (includes shell)
docker pull ghcr.io/vnvo/deltaforge:latest-debug

Or build locally:

docker build -t deltaforge:local .

Run it by mounting your pipeline specs (environment variables are expanded inside the YAML) and exposing the API and metrics ports:

docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -v $(pwd)/examples/dev.yaml:/etc/deltaforge/pipelines.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  deltaforge:local \
  --config /etc/deltaforge/pipelines.yaml

or with env variables to be expanded inside the provided config:

# pull the container
docker pull ghcr.io/vnvo/deltaforge:latest

# run it
docker run --rm \
  -p 8080:8080 -p 9000:9000 \
  -e MYSQL_DSN="mysql://user:pass@host:3306/db" \
  -e KAFKA_BROKERS="kafka:9092" \
  -v $(pwd)/pipeline.yaml:/etc/deltaforge/pipeline.yaml:ro \
  -v deltaforge-checkpoints:/app/data \
  ghcr.io/vnvo/deltaforge:latest \
  --config /etc/deltaforge/pipeline.yaml

The container runs as a non-root user, writes checkpoints to /app/data/df_checkpoints.json, and listens on 0.0.0.0:8080 for the control plane API with metrics served on :9000.

Architecture Highlights

Delivery Guarantees

DeltaForge supports end-to-end exactly-once (Kafka transactions), at-least-once with server-side dedup (NATS), and at-least-once with consumer-side dedup (Redis). Checkpoints are saved only after sink acknowledgement — never before.

Each sink maintains its own independent checkpoint. The fastest sink is never held back by the slowest. On restart, the source replays from the minimum checkpoint across all sinks.

Source → Processor → Sinks (deliver concurrently) → Policy check → Per-sink checkpoints

Transaction boundaries are preserved: all rows from one database transaction appear in the same batch and are delivered atomically to each sink (respect_source_tx: true by default).

📘 Full details: Guarantees & Correctness

Schema-Checkpoint Correlation

The schema registry tracks schema versions with sequence numbers and optional checkpoint correlation. During replay, events are interpreted with the schema that was active when they were produced - even if the table structure has since changed.

Source-Owned Schemas

Unlike tools that normalize all databases to a universal type system, DeltaForge lets each source define its own schema semantics. MySQL schemas capture MySQL types (bigint(20) unsigned, json), PostgreSQL schemas preserve arrays and custom types. No lossy normalization, no universal type maintenance burden.

API

The REST API exposes JSON endpoints for liveness, readiness, and pipeline lifecycle management. Routes key pipelines by the metadata.name field from their specs and return PipeInfo payloads that include the pipeline name, status, and full configuration.

Health

  • GET /health - lightweight liveness probe returning ok.
  • GET /ready - readiness view returning {"status":"ready","pipelines":[...]} with the current pipeline states.

Pipeline management

  • GET /pipelines - list all pipelines with their current status and config.
  • POST /pipelines - create a new pipeline from a full PipelineSpec document.
  • GET /pipelines/{name} - get a single pipeline by name.
  • PATCH /pipelines/{name} - apply a partial JSON patch to an existing pipeline (e.g., adjust batch or connection settings) and restart it with the merged spec.
  • DELETE /pipelines/{name} - permanently delete a pipeline.
  • POST /pipelines/{name}/pause - pause ingestion and processing for the pipeline.
  • POST /pipelines/{name}/resume - resume a paused pipeline.
  • POST /pipelines/{name}/stop - stop a running pipeline.

Schema endpoints

  • GET /pipelines/{name}/schemas - list DB schemas for the pipeline.
  • GET /pipelines/{name}/sensing/schemas - list inferred schemas (from sensing).
  • GET /pipelines/{name}/sensing/schemas/{table} - get inferred schema details.
  • GET /pipelines/{name}/sensing/schemas/{table}/json-schema - export as JSON Schema.
  • GET /pipelines/{name}/sensing/schemas/{table}/classifications - get dynamic map classifications.
  • GET /pipelines/{name}/drift - get drift detection results.
  • GET /pipelines/{name}/sensing/stats - get schema sensing cache statistics.

Configuration schema

Pipelines are defined as YAML documents that map directly to the internal PipelineSpec type. Environment variables are expanded before parsing, so secrets and URLs can be injected at runtime.

Full Example

metadata:
  name: orders-mysql-to-kafka
  tenant: acme

spec:
  sharding:
    mode: hash
    count: 4
    key: customer_id

  source:
    type: mysql
    config:
      id: orders-mysql
      dsn: ${MYSQL_DSN}
      tables:
        - shop.orders
        - shop.outbox
      outbox:
        tables: ["shop.outbox"]
      snapshot:
        mode: initial

  processors:
    - type: javascript
      id: my-custom-transform
      inline: |
        function processBatch(events) {
          return events;
        }
      limits:
        cpu_ms: 50
        mem_mb: 128
        timeout_ms: 500

  sinks:
    - type: kafka
      config:
        id: orders-kafka
        brokers: ${KAFKA_BROKERS}
        topic: orders
        envelope:
          type: debezium
        encoding: json
        required: true
        exactly_once: false
    - type: redis
      config:
        id: orders-redis
        uri: ${REDIS_URI}
        stream: orders
        envelope:
          type: native
        encoding: json
  
  batch:
    max_events: 500
    max_bytes: 1048576
    max_ms: 1000
    respect_source_tx: true

  commit_policy:
    mode: quorum
    quorum: 2

  schema_sensing:
    enabled: true
    deep_inspect:
      enabled: true
      max_depth: 3
    sampling:
      warmup_events: 50
      sample_rate: 5
    high_cardinality:
      enabled: true
      min_events: 100

Key fields

Field Description
metadata
name Pipeline identifier (used in API routes and metrics)
tenant Business-oriented tenant label
spec.source Database source - MySQL, PostgreSQL, etc.
type mysql, postgres, etc.
config.id Unique identifier for checkpoints
config.dsn Connection string (supports ${ENV_VAR})
config.tables Table patterns to capture
config.outbox Tag outbox tables/prefixes with __outbox sentinel for the outbox processor
config.snapshot Initial load: mode (never/initial/always), chunk_size, max_parallel_tables
config.on_schema_drift adapt (default) — continue after failover schema drift; halt — stop for operator intervention
spec.processors Optional transforms - see Processors
type javascript, outbox, flatten, filter
inline JavaScript code for batch processing
limits CPU, memory, and timeout limits
spec.sinks One or more sinks - see Sinks
type kafka, redis, nats, or http
config.envelope Output format: native, debezium, or cloudevents - see Envelopes
config.encoding Wire encoding: json (default) or avro (with Schema Registry) - see Envelopes
config.required Whether sink must ack for checkpoint (true default)
spec.batch Commit unit thresholds - see Batching
max_events Flush after N events (default: 500)
max_bytes Flush after size limit (default: 1MB)
max_ms Flush after time (default: 1000ms)
respect_source_tx Keep source transactions intact (true default)
spec.commit_policy Checkpoint gating - see Commit policy
mode all, required (default), or quorum
quorum Number of sinks for quorum mode
spec.schema_sensing Runtime schema inference - see Schema sensing
enabled Enable schema sensing (false default)
deep_inspect Nested JSON inspection settings
sampling Sampling rate and warmup config
high_cardinality Dynamic key detection settings

📘 Full reference: Configuration docs

View actual examples: Example Configurations

Roadmap

  • Outbox pattern support
  • Flatten processor
  • Filter processor
  • Persistent schema registry (SQLite, then PostgreSQL)
  • Snapshot/backfill (initial load for existing tables)
  • HTTP/Webhook sink
  • Dead letter queue with per-event routing
  • Per-sink independent checkpoints
  • Exactly-once delivery (Kafka transactions)
  • Avro encoding with Confluent Schema Registry
  • Helm chart for Kubernetes deployment
  • S3/Parquet sink
  • MongoDB source
  • Event replay from DLQ journal
  • Kubernetes operator (PipelineTemplate + PipelinePool)
  • Protobuf encoding

License

Licensed under either of

at your option.

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in this project by you shall be dual licensed as above, without additional terms or conditions.

About

A versatile, high-performance Change Data Capture (CDC) engine built in Rust. Streams database changes into downstream systems like Kafka, Redis, NATS and etc, giving you full control over routing, transformation, and delivery.

Topics

Resources

License

Unknown and 2 other licenses found

Licenses found

Unknown
LICENSE
Unknown
LICENSE-APACHE
MIT
LICENSE-MIT

Stars

Watchers

Forks

Packages

 
 
 

Contributors