From d29f06996cff33a5563186f476378a7d417ed577 Mon Sep 17 00:00:00 2001 From: Bob Potter Date: Thu, 10 Jul 2014 13:46:40 -0500 Subject: [PATCH 1/3] Add an optional response callback to Producer --- lib/poseidon/messages_for_broker.rb | 4 +- lib/poseidon/produce_result.rb | 71 +++++++++++++++++++++++++++++ lib/poseidon/producer.rb | 8 +++- lib/poseidon/sync_producer.rb | 9 ++-- 4 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 lib/poseidon/produce_result.rb diff --git a/lib/poseidon/messages_for_broker.rb b/lib/poseidon/messages_for_broker.rb index e2ba079..9873bf3 100644 --- a/lib/poseidon/messages_for_broker.rb +++ b/lib/poseidon/messages_for_broker.rb @@ -39,12 +39,14 @@ def build_protocol_objects(compression_config) # We can always retry these errors because they mean none of the kafka brokers persisted the message ALWAYS_RETRYABLE = [Poseidon::Errors::LeaderNotAvailable, Poseidon::Errors::NotLeaderForPartition] - def successfully_sent(producer_response) + def successfully_sent(producer_response, callback) failed = [] producer_response.topic_response.each do |topic_response| topic_response.partitions.each do |partition| if ALWAYS_RETRYABLE.include?(partition.error_class) failed.push(*@topics[topic_response.topic][partition.partition]) + else + callback.call(ProduceResult.new(topic_response, partition, @topics[topic_response.topic][partition.partition])) end end end diff --git a/lib/poseidon/produce_result.rb b/lib/poseidon/produce_result.rb new file mode 100644 index 0000000..b057a92 --- /dev/null +++ b/lib/poseidon/produce_result.rb @@ -0,0 +1,71 @@ +module Poseidon + # Represents the result of a produce attempt against a kafka topic. + class ProduceResult + def initialize(produce_topic_response, produce_partition_response, messages) + @produce_topic_response = produce_topic_response + @produce_partition_response = produce_partition_response + @messages = messages + end + + # Was the produce request successful? + # + # NOTE: This will return false if required_acks is > 1 and the leader + # timedout waiting for a response from the replicas. In this case + # trying to resend the messages will likely lead to duplicate messages + # because the leader will have succesfully persisted the message. + # + # You can use the `timeout?` method to differentiate between this case + # and other failures. + # + # @return [Boolean] + def success? + produce_partition_response.error == Poseidon::NO_ERROR_CODE + end + + # Did we fail to receive the required number of acks? + # + # @return [Boolean] + def timeout? + produce_partition_response.error == Poseidon::Error::RequestTimedOut + end + + # Return an error if we recieved one. + # + # @return [Poseidon::Error::ProtocolError,Nil] + def error + if !success? + produce_partition_response.error_class + else + nil + end + end + + # The messages we the produce request sent. + # + # @return [Array] + def messages + @messages + end + + # The topic we sent the messages to. + # + # @return [String] + def topic + @produce_topic_response.topic + end + + # The partition we sent the message to. + # + # @return [Fixnum] + def partition + @produce_partition_response.partition + end + + # The offset of the first message the broker persisted from this batch. + # + # @return [Fixnum] + def offset + @produce_partition_response.offset + end + end +end diff --git a/lib/poseidon/producer.rb b/lib/poseidon/producer.rb index 8241c02..7c810bd 100644 --- a/lib/poseidon/producer.rb +++ b/lib/poseidon/producer.rb @@ -147,16 +147,20 @@ def initialize(brokers, client_id, options = {}) # @param [Enumerable] messages # Messages must have a +topic+ set and may have a +key+ set. # + # @yieldparam [ProduceResult] callback + # Optional callback which will be triggered _at least once_ for each + # (topic,partition) pair we attempt to send messages to. + # # @return [Boolean] # # @api public - def send_messages(messages) + def send_messages(messages, &callback) raise Errors::ProducerShutdownError if @shutdown if !messages.respond_to?(:each) raise ArgumentError, "messages must respond to #each" end - @producer.send_messages(convert_to_messages_objects(messages)) + @producer.send_messages(convert_to_messages_objects(messages), callback) end # Closes all open connections to brokers diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index e6ac006..093ebe4 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -35,7 +35,7 @@ def initialize(client_id, seed_brokers, options = {}) @broker_pool = BrokerPool.new(client_id, seed_brokers) end - def send_messages(messages) + def send_messages(messages, &callback) return if messages.empty? messages_to_send = MessagesToSend.new(messages, @cluster_metadata) @@ -50,7 +50,7 @@ def send_messages(messages) end messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| - if sent = send_to_broker(messages_for_broker) + if sent = send_to_broker(messages_for_broker, callback) messages_to_send.successfully_sent(sent) end end @@ -106,13 +106,14 @@ def refresh_metadata(topics) false end - def send_to_broker(messages_for_broker) + def send_to_broker(messages_for_broker, callback) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) - return messages_for_broker.successfully_sent(response) + sent = messages_for_broker.successfully_sent(response, callback) + sent rescue Connection::ConnectionFailedError false end From ae7ffa354babdc92a7dbe83cfe343e28592195e8 Mon Sep 17 00:00:00 2001 From: Bob Potter Date: Thu, 10 Jul 2014 16:53:14 -0500 Subject: [PATCH 2/3] Use tests yo --- lib/poseidon.rb | 1 + lib/poseidon/produce_result.rb | 8 ++++---- spec/unit/producer_spec.rb | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/poseidon.rb b/lib/poseidon.rb index 276209c..2ddd3b7 100644 --- a/lib/poseidon.rb +++ b/lib/poseidon.rb @@ -82,6 +82,7 @@ class ProducerShutdownError < StandardError; end require "poseidon/producer" require "poseidon/fetched_message" require "poseidon/partition_consumer" +require "poseidon/produce_result" # Poseidon! require "poseidon/message" diff --git a/lib/poseidon/produce_result.rb b/lib/poseidon/produce_result.rb index b057a92..557cf60 100644 --- a/lib/poseidon/produce_result.rb +++ b/lib/poseidon/produce_result.rb @@ -19,22 +19,22 @@ def initialize(produce_topic_response, produce_partition_response, messages) # # @return [Boolean] def success? - produce_partition_response.error == Poseidon::NO_ERROR_CODE + @produce_partition_response.error == Poseidon::Errors::NO_ERROR_CODE end # Did we fail to receive the required number of acks? # # @return [Boolean] def timeout? - produce_partition_response.error == Poseidon::Error::RequestTimedOut + @produce_partition_response.error_class == Poseidon::Errors::RequestTimedOut end # Return an error if we recieved one. # - # @return [Poseidon::Error::ProtocolError,Nil] + # @return [Poseidon::Errors::ProtocolError,Nil] def error if !success? - produce_partition_response.error_class + @produce_partition_response.error_class else nil end diff --git a/spec/unit/producer_spec.rb b/spec/unit/producer_spec.rb index c5680f0..a166f17 100644 --- a/spec/unit/producer_spec.rb +++ b/spec/unit/producer_spec.rb @@ -32,7 +32,7 @@ end it "turns MessagesToSend into Message objects" do - @sync_producer.should_receive(:send_messages).with([an_instance_of(Message)]) + @sync_producer.should_receive(:send_messages).with([an_instance_of(Message)], anything) m = MessageToSend.new("topic", "value") @producer.send_messages([m]) From c703749aa512d5e8be7c48a48f335cd10f680be3 Mon Sep 17 00:00:00 2001 From: Bob Potter Date: Thu, 10 Jul 2014 17:20:56 -0500 Subject: [PATCH 3/3] GOGO --- lib/poseidon/messages_for_broker.rb | 4 +- lib/poseidon/producer.rb | 4 +- lib/poseidon/sync_producer.rb | 8 +- .../multiple_brokers/spec_helper.rb | 2 +- spec/integration/simple/spec_helper.rb | 2 +- spec/unit/produce_result_spec.rb | 95 +++++++++++++++++++ 6 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 spec/unit/produce_result_spec.rb diff --git a/lib/poseidon/messages_for_broker.rb b/lib/poseidon/messages_for_broker.rb index 9873bf3..a3c8435 100644 --- a/lib/poseidon/messages_for_broker.rb +++ b/lib/poseidon/messages_for_broker.rb @@ -46,7 +46,9 @@ def successfully_sent(producer_response, callback) if ALWAYS_RETRYABLE.include?(partition.error_class) failed.push(*@topics[topic_response.topic][partition.partition]) else - callback.call(ProduceResult.new(topic_response, partition, @topics[topic_response.topic][partition.partition])) + if callback + callback.call(ProduceResult.new(topic_response, partition, @topics[topic_response.topic][partition.partition])) + end end end end diff --git a/lib/poseidon/producer.rb b/lib/poseidon/producer.rb index 7c810bd..3bfd89c 100644 --- a/lib/poseidon/producer.rb +++ b/lib/poseidon/producer.rb @@ -151,6 +151,8 @@ def initialize(brokers, client_id, options = {}) # Optional callback which will be triggered _at least once_ for each # (topic,partition) pair we attempt to send messages to. # + # Will never be called if required_acks is 0. + # # @return [Boolean] # # @api public @@ -160,7 +162,7 @@ def send_messages(messages, &callback) raise ArgumentError, "messages must respond to #each" end - @producer.send_messages(convert_to_messages_objects(messages), callback) + @producer.send_messages(convert_to_messages_objects(messages), &callback) end # Closes all open connections to brokers diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index 093ebe4..c80bfb3 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -112,8 +112,12 @@ def send_to_broker(messages_for_broker, callback) response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) - sent = messages_for_broker.successfully_sent(response, callback) - sent + if required_acks != 0 + messages_for_broker.successfully_sent(response, callback) + else + # Client requested 0 acks, assume all were successful + messages_for_broker.messages + end rescue Connection::ConnectionFailedError false end diff --git a/spec/integration/multiple_brokers/spec_helper.rb b/spec/integration/multiple_brokers/spec_helper.rb index b4c1116..1431a7f 100644 --- a/spec/integration/multiple_brokers/spec_helper.rb +++ b/spec/integration/multiple_brokers/spec_helper.rb @@ -38,6 +38,6 @@ def start_first_broker end config.after(:suite) do - $tc.stop + $tc.stop if $tc end end diff --git a/spec/integration/simple/spec_helper.rb b/spec/integration/simple/spec_helper.rb index bda8ccf..b9dc997 100644 --- a/spec/integration/simple/spec_helper.rb +++ b/spec/integration/simple/spec_helper.rb @@ -12,6 +12,6 @@ end config.after(:suite) do - $tc.stop + $tc.stop if $tc end end diff --git a/spec/unit/produce_result_spec.rb b/spec/unit/produce_result_spec.rb new file mode 100644 index 0000000..e613afa --- /dev/null +++ b/spec/unit/produce_result_spec.rb @@ -0,0 +1,95 @@ +require 'spec_helper' +include Poseidon::Protocol + +describe ProduceResult do + context "successful result" do + before(:each) do + @messages = [mock('message')] + partition_response = ProducePartitionResponse.new(1, 0, 10) + topic_response = ProduceTopicResponse.new("topic", [partition_response]) + @pr = ProduceResult.new( + topic_response, + partition_response, + @messages + ) + end + + it "is success?" do + expect(@pr.success?).to eq(true) + end + + it "has no error" do + expect(@pr.error).to eq(nil) + end + + it "did not timeout" do + expect(@pr.timeout?).to eq(false) + end + + it "provides topic" do + expect(@pr.topic).to eq("topic") + end + + it "provides partition" do + expect(@pr.partition).to eq(1) + end + + it "provides offset" do + expect(@pr.offset).to eq(10) + end + + it "provides messages" do + expect(@pr.messages).to eq(@messages) + end + end + + context "failed result" do + before(:each) do + @messages = [mock('message')] + partition_response = ProducePartitionResponse.new(1, 2, -1) + topic_response = ProduceTopicResponse.new("topic", [partition_response]) + @pr = ProduceResult.new( + topic_response, + partition_response, + @messages + ) + end + + it "is success?" do + expect(@pr.success?).to eq(false) + end + + it "has error" do + expect(@pr.error).to eq(Poseidon::Errors::InvalidMessage) + end + + it "did not timeout" do + expect(@pr.timeout?).to eq(false) + end + end + + context "timedout" do + before(:each) do + @messages = [mock('message')] + partition_response = ProducePartitionResponse.new(1, 7, -1) + topic_response = ProduceTopicResponse.new("topic", [partition_response]) + @pr = ProduceResult.new( + topic_response, + partition_response, + @messages + ) + end + + it "is success?" do + expect(@pr.success?).to eq(false) + end + + it "has error" do + expect(@pr.error).to eq(Poseidon::Errors::RequestTimedOut) + end + + it "did timeout" do + expect(@pr.timeout?).to eq(true) + end + end +end