A generic Kafka client tuned for our own usage.
Features:
- Messages are expected to be hashes and are
BSON-serialized - Connections will be properly closed on exceptions
- Consumer will stop gracefully on
SIGTERM - At most once consumer semantics are used
- Production via an HTTP proxy (including SSL)
require "cc/kafka"producer = CC::Kafka::Producer.new("kafka://host:1234/topic", "client-id")
producer.send_message(foo: :bar, baz: :bat)
producer.closeconsumer = CC::Kafka::Consumer.new("client-id", ["kafka://host:1234", "..."], "topic", 0)
consumer.on_message do |message|
# Given the producer above, message will be
#
# {
# "foo" => :bar,
# "baz" => :bat,
# CC::Kafka::MESSAGE_OFFSET_KEY => "topic-0-1",
# }
#
end
consumer.startNote: the value for the MESSAGE_OFFSET_KEY identifies the message's offset
within the given topic and partition as <topic>-<partition>-<offset>. It can
be used by consumers to tie created data to the message that lead to it and
prevent duplicate processing.
-
CC::Kafka.offset_modelMust respond to
find_for_create!(attributes)and return an object that responds toset(attributes).The
attributesused aretopic,partition, andcurrent. And the object returned fromfind_or_create!must expose methods for each of these.A
Minidoc-based module is included that can be included in client code for an offset model implementation that will work for many clients.class KafkaOffset < Minidoc include CC::Kafka::OffsetStorage::Minidoc end CC::Kafka.offset_model = KafkaOffset
Note: This is only necessary if using
Consumer. -
Kafka.loggerThis is optional and defaults to
Logger.new(STDOUT). The configured object must have the same interface as the standard Ruby logger.Example:
Kafka.logger = Rails.logger
-
Kafka.statsdThis is optional and defaults to a null object. The configured object should represent a statsd client and respond to the usual methods,
increment,time, etc. -
Kafka.ssl_ca_filePath to a custom SSL Certificate Authority file.
Will result in:
http.ca_file = Kafka.ssl_ca_file
-
Kafka.ssl_pem_filePath to a custom SSL Certificate (and key) in concatenated, PEM format.
Will result in:
pem = File.read(Kafka.ssl_pem_file) http.cert = OpenSSL::X509::Certificate.new(pem) http.key = OpenSSL::PKey::RSA.new(pem)
See LICENSE