Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/poseidon/partition_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ def next_offset
@offset
end

# Causes the next fetch to proceed from this offset, rather than
# +next_offset+. The effect is similar to reopening a consumer, but
# much faster. Nonetheless, seeks shouldn't be abused because of the
# remoteness of Kafka.
#
# @param [Integer,Symbol] offset : the offset, similar to +initialize+.
# The symbols :earliest_offset and :latest_offset are explicitly okay.
#
# @return [Nil]
def seek_to_offset(new_offset)
@offset = new_offset
nil
end

# Close the connection to the kafka broker
#
# @return [Nil]
Expand Down
4 changes: 4 additions & 0 deletions spec/unit/partition_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
pc = PartitionConsumer.new("test_client", "localhost", 9092, "test_topic",
0, :earliest_offset)
expect(pc.next_offset).to eq(100)
pc.seek_to_offset(55)
expect(pc.next_offset).to eq(55)
pc.seek_to_offset(:earliest_offset)
expect(pc.next_offset).to eq(100) # as before
end
end

Expand Down