diff --git a/lib/poseidon/partition_consumer.rb b/lib/poseidon/partition_consumer.rb index d90a64c..73f5a9a 100644 --- a/lib/poseidon/partition_consumer.rb +++ b/lib/poseidon/partition_consumer.rb @@ -139,6 +139,20 @@ def next_offset @offset end + # Causes the next fetch to proceed from this offset, rather than + # +next_offset+. The effect is similar to reopening a consumer, but + # much faster. Nonetheless, seeks shouldn't be abused because of the + # remoteness of Kafka. + # + # @param [Integer,Symbol] offset : the offset, similar to +initialize+. + # The symbols :earliest_offset and :latest_offset are explicitly okay. + # + # @return [Nil] + def seek_to_offset(new_offset) + @offset = new_offset + nil + end + # Close the connection to the kafka broker # # @return [Nil] diff --git a/spec/unit/partition_consumer_spec.rb b/spec/unit/partition_consumer_spec.rb index 8f4a877..4fcd278 100644 --- a/spec/unit/partition_consumer_spec.rb +++ b/spec/unit/partition_consumer_spec.rb @@ -39,6 +39,10 @@ pc = PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :earliest_offset) expect(pc.next_offset).to eq(100) + pc.seek_to_offset(55) + expect(pc.next_offset).to eq(55) + pc.seek_to_offset(:earliest_offset) + expect(pc.next_offset).to eq(100) # as before end end