Skip to content

feat(itch-source-kafka): KafkaSource consuming an upstream Kafka topic (#57)#114

Merged
joaquinbejar merged 3 commits intomainfrom
feat/57-itch-source-kafka
May 6, 2026
Merged

feat(itch-source-kafka): KafkaSource consuming an upstream Kafka topic (#57)#114
joaquinbejar merged 3 commits intomainfrom
feat/57-itch-source-kafka

Conversation

@joaquinbejar
Copy link
Copy Markdown
Owner

Closes #57.

Summary

New crate `itch-source-kafka` implementing `MessageSource` (via `Stream<Item = Result<Message, SourceError>>`) by consuming an upstream Kafka topic where each record value is a serialized ITCH message.

  • `KafkaSourceConfig { brokers, group_id, topic, auto_offset_reset, additional_props }`.
  • `AutoOffsetReset { Earliest, Latest }`.
  • `KafkaSource::connect(config)` validates non-empty broker/group/topic and subscribes.
  • `KafkaSource::with_decoder` for custom record framing (JSON, Avro, length-prefixed, etc.).
  • `impl Stream for KafkaSource` using rdkafka's `StreamConsumer` (Option-A fresh-MessageStream-per-poll pattern).
  • `KafkaSourceError` (`#[non_exhaustive]`): `Kafka`, `Decode`, `Config { reason }`.

Per-record outcomes

Outcome Yield log
`Ok` payload, decoder ok `Ok(Message)` `debug!`
`Ok` payload, decode error `Err(SourceError::Backend(...))` `warn!`
`Ok` with `None` payload `Err(SourceError::Backend("no payload"))` `warn!`
`Err(KafkaError)` `Err(SourceError::Backend(...))` `warn!`

Dependencies

`rdkafka 0.36` with `cmake-build` + `tokio` features (`default-features = false`). Adds ~2 minutes of first-time CI build but is bullet-proof against host config drift.

Test plan

11 tests; 4 integration tests are skip-not-fail when `ITCH_KAFKA_BROKERS` is unset (CI default):

  • 5 unit tests (always run).

  • `connect_subscribes_to_topic`, `produce_then_consume_round_trip` (100 messages), `bad_payload_yields_backend_error_then_recovers`, `with_decoder_overrides_default`.

  • 2 doctests (Quick Start + decoder, both `rust,no_run`).

  • `cargo clippy --workspace --all-targets --all-features -- -D warnings`.

  • `cargo fmt --all --check`.

  • `cargo test -p itch-source-kafka --all-features`.

  • `RUSTDOCFLAGS=-D warnings cargo doc --workspace --no-deps`.

#57)

New crate `itch-source-kafka` implementing `MessageSource` (via
`Stream<Item = Result<Message, SourceError>>`) by consuming an
upstream Kafka topic where each record value is a serialized ITCH
message. Per docs/ITCH-SOURCE.md §4 + ADR-0012 Future Work.

Use case: an upstream system (matching engine, market-data fan-out,
audit pipeline) publishes ITCH messages to Kafka; this crate lets
any itch-rs publisher consume that topic and re-emit over
SoupBinTCP / MoldUDP64.

Public surface:
- `KafkaSourceConfig { brokers, group_id, topic, auto_offset_reset,
  additional_props }` — required strings (no Default; the three
  Kafka identifiers have no localhost-equivalent).
- `AutoOffsetReset { Earliest, Latest }` — `Copy`.
- `KafkaSource::connect(config)` — async; builds rdkafka
  ClientConfig, validates non-empty brokers/group_id/topic
  (returns `KafkaSourceError::Config { reason }` on failure),
  creates StreamConsumer, calls `subscribe(&[topic])`. Issues
  `tracing::info!` on success.
- `KafkaSource::with_decoder<F>(self, decoder)` — replace the
  default `Message::decode` decoder for users whose records use
  custom framing (JSON, Avro, length-prefixed, etc.).
- `KafkaSource::topic()` — read-only accessor.
- `impl Stream for KafkaSource` (Item = Result<Message, SourceError>)
  using rdkafka's StreamConsumer + the Option-A "fresh
  MessageStream per poll" pattern (rdkafka retains the record queue
  on the consumer; the per-poll MessageStream is a thin view).
  Per-record outcomes:
  - Ok payload   -> decoder -> Ok(Message) (debug! per record).
  - Err KafkaError -> Err(SourceError::Backend(...)) (warn!).
  - None payload -> Err(SourceError::Backend("no payload")) (warn!).
- `KafkaSourceError` (`#[non_exhaustive]`): Kafka(#[from]
  KafkaError), Decode(ProtocolError), Config { reason }.

Dependencies: rdkafka 0.36 with `cmake-build` + `tokio` features
(`default-features = false`), futures, async-trait, thiserror,
tracing, tokio. `cmake-build` compiles librdkafka 2.12 from
source so CI doesn't depend on system `librdkafka-dev` -- adds
~2 minutes of first-time build to CI but is bullet-proof against
host config drift.

11 tests:
- 5 unit tests in lib.rs (AutoOffsetReset Display, config
  defaults, decoder substitution, error variant Display, etc.)
  -- run unconditionally even with no Kafka.
- 4 integration tests gated on env var ITCH_KAFKA_BROKERS:
  connect_subscribes_to_topic, produce_then_consume_round_trip,
  bad_payload_yields_backend_error_then_recovers,
  with_decoder_overrides_default. CI default (no env var) skips
  in <500ms total via tokio::time::timeout(Duration::from_secs(5),
  ...) on connect.
- 2 doctests (Quick Start `rust,no_run`, decoder example
  `rust,no_run`).

Auto-commit policy: enable.auto.commit = true (rdkafka default).
Documented in the README and on `connect`'s rustdoc; future
versions can expose explicit-commit mode for at-least-once
publishers.

All gates clean:
- `cargo clippy -p itch-source-kafka --all-features -- -D warnings`
- `cargo fmt --all --check`
- `cargo test -p itch-source-kafka --all-features` (5 unit + 4
  integration skipped + 2 doc)
- `RUSTDOCFLAGS=-D warnings cargo doc --workspace --no-deps`

Closes #57.

Signed-off-by: Joaquin Bejar <jb@taunais.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new itch-source-kafka crate that implements the itch_source::MessageSource marker trait by consuming an upstream Kafka topic (record value = encoded ITCH message), enabling Kafka-fed publishers in the itch-rs ecosystem.

Changes:

  • Introduces KafkaSource, KafkaSourceConfig, and AutoOffsetReset, plus KafkaSourceError for construction-time failures.
  • Implements Stream<Item = Result<Message, SourceError>> over rdkafka::StreamConsumer, with optional custom per-record decoding via with_decoder.
  • Adds crate docs (README/CHANGELOG), unit tests, and Kafka-backed integration tests (skip-not-fail when ITCH_KAFKA_BROKERS is unset); wires the crate into the workspace.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
Cargo.toml Adds crates/itch-source-kafka to workspace members.
Cargo.lock Locks new dependencies introduced by rdkafka/cmake-build and the new crate.
crates/itch-source-kafka/Cargo.toml New crate manifest with rdkafka + tokio/tracing/futures deps.
crates/itch-source-kafka/src/lib.rs Core KafkaSource implementation, config/types, stream polling, and unit tests.
crates/itch-source-kafka/tests/integration.rs Real-Kafka integration tests (env-gated) for connect/round-trip/decoder/recovery.
crates/itch-source-kafka/README.md Usage docs, custom decoder example, and testing instructions.
crates/itch-source-kafka/CHANGELOG.md Initial changelog entries for the new crate.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/itch-source-kafka/Cargo.toml
Comment thread crates/itch-source-kafka/src/lib.rs Outdated
Comment thread crates/itch-source-kafka/src/lib.rs Outdated
Comment thread crates/itch-source-kafka/src/lib.rs Outdated
Comment thread crates/itch-source-kafka/tests/integration.rs
Comment thread crates/itch-source-kafka/tests/integration.rs Outdated
Comment thread crates/itch-source-kafka/tests/integration.rs Outdated
Apply 7 inline-comment fixes from Copilot's review of
itch-source-kafka:

1. Drop unused `async-trait = "0.1"` from Cargo.toml — no
   `#[async_trait]` impls in this crate.
2. Rewrite `KafkaSourceError::Decode` rustdoc — the variant is
   reserved for future caller-driven decoder construction errors;
   `connect` does not perform a per-record probe. Per-record
   decode failures during streaming surface through
   `SourceError::Backend`, never as `KafkaSourceError::Decode`.
3. Per-record decode failures: replace
   `SourceError::Backend(format!(\"decode error: {err}\").into())`
   with `SourceError::Backend(Box::new(err))` so downstream
   logging keeps the original `ProtocolError` and its
   `Error::source` chain.
4. Per-record consumer errors: same pattern —
   `SourceError::Backend(Box::new(err))` preserves the
   `KafkaError` type and source chain.
5. Rename `connect_subscribes_to_topic` ->
   `connect_returns_source_with_topic_accessor`. The test only
   asserts `topic()` accessor; end-to-end subscription
   correctness lives in `produce_then_consume_round_trip`.
6. Reword the producer-key comment — a constant key collapses to
   one partition, but the partition number depends on
   partitioner / hash / partition count, not necessarily 0.
7. Replace the hard-coded `\"after 5s\"` skip log with
   `\"after {CONNECT_TIMEOUT:?}\"` so the message stays correct
   if the constant changes.

All gates clean:
- `cargo clippy -p itch-source-kafka --all-features --all-targets -- -D warnings`
- `cargo fmt --all --check`
- `cargo test -p itch-source-kafka --all-features` (5 unit + 4 integration skipped + 2 doc)

Signed-off-by: Joaquin Bejar <jb@taunais.com>
@joaquinbejar
Copy link
Copy Markdown
Owner Author

Thanks for the review. All 7 inline comments addressed in 1afbe2a, gates green locally.

The new `itch-source-kafka` crate (#57) depends on `rdkafka 0.36`
with `cmake-build`, which compiles `librdkafka 2.12.1` from source.
librdkafka 2.12 dropped the WITH_CURL=0 escape hatch and now
mandates `libcurl4-openssl-dev` as a build-time header dependency
for OAuth/OIDC token refresh — even with all networking features
otherwise off. The CMake step fails on the GitHub Actions runner
with:

    fatal error: curl/curl.h: No such file or directory

Add `apt-get install -y libcurl4-openssl-dev libssl-dev libsasl2-dev`
to every Cargo job that builds the workspace (clippy, test,
build-release, doc, msrv, coverage). librdkafka also pulls openssl
and SASL headers under the same build, so include both pre-emptively
to avoid a second round-trip if librdkafka 2.12.x's defaults shift
again.

Fuzz / fmt jobs are unaffected — they don't compile rdkafka.

Signed-off-by: Joaquin Bejar <jb@taunais.com>
@joaquinbejar joaquinbejar merged commit 78516bd into main May 6, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

itch-source-kafka — KafkaSource consuming an upstream Kafka topic

2 participants