From f6f25b9e01eb258fe76ecb5744efdeb19f61aae5 Mon Sep 17 00:00:00 2001 From: Ovidiu Gheorghioiu Date: Thu, 3 Dec 2015 17:00:08 -0800 Subject: [PATCH] Add seek_to_offset functionality to PartitionConsumer. As it turns out, it's a simple matter of changing where the next fetch will be requested (a state variable). The method accepts the same special offsets as the constructor. --- lib/poseidon/partition_consumer.rb | 14 ++++++++++++++ spec/unit/partition_consumer_spec.rb | 4 ++++ 2 files changed, 18 insertions(+) diff --git a/lib/poseidon/partition_consumer.rb b/lib/poseidon/partition_consumer.rb index d90a64c..73f5a9a 100644 --- a/lib/poseidon/partition_consumer.rb +++ b/lib/poseidon/partition_consumer.rb @@ -139,6 +139,20 @@ def next_offset @offset end + # Causes the next fetch to proceed from this offset, rather than + # +next_offset+. The effect is similar to reopening a consumer, but + # much faster. Nonetheless, seeks shouldn't be abused because of the + # remoteness of Kafka. + # + # @param [Integer,Symbol] offset : the offset, similar to +initialize+. + # The symbols :earliest_offset and :latest_offset are explicitly okay. + # + # @return [Nil] + def seek_to_offset(new_offset) + @offset = new_offset + nil + end + # Close the connection to the kafka broker # # @return [Nil] diff --git a/spec/unit/partition_consumer_spec.rb b/spec/unit/partition_consumer_spec.rb index 8f4a877..4fcd278 100644 --- a/spec/unit/partition_consumer_spec.rb +++ b/spec/unit/partition_consumer_spec.rb @@ -39,6 +39,10 @@ pc = PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :earliest_offset) expect(pc.next_offset).to eq(100) + pc.seek_to_offset(55) + expect(pc.next_offset).to eq(55) + pc.seek_to_offset(:earliest_offset) + expect(pc.next_offset).to eq(100) # as before end end