From cf870c4b57ccdbd7e5ba8f572c85ef724dcc6c38 Mon Sep 17 00:00:00 2001 From: Yoshihiro MIYAI Date: Sat, 9 Jul 2016 10:28:28 +0900 Subject: [PATCH 1/2] Fix error message_count is too large --- README.md | 3 +++ lib/fluent/plugin/out_gcloud_pubsub.rb | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e2ee625..1a3ab86 100644 --- a/README.md +++ b/README.md @@ -36,11 +36,14 @@ publish dummy json data like `{"message": "dummy", "value": 0}\n{"message": "dum key flush_interval 10 autocreate_topic false + max_messages 1000 ``` - `autocreate_topic` (optional, default: `false`) - If set to `true`, specified topic will be created when it doesn't exist. +- `max_messages` (optional, default: `1000`) + - Publishing messages per request to Cloud Pub/Sub. ref: https://cloud.google.com/pubsub/quotas#other_limits ## Pull messages Use `in_gcloud_pubsub`. diff --git a/lib/fluent/plugin/out_gcloud_pubsub.rb b/lib/fluent/plugin/out_gcloud_pubsub.rb index d80d0bf..f84f6fe 100644 --- a/lib/fluent/plugin/out_gcloud_pubsub.rb +++ b/lib/fluent/plugin/out_gcloud_pubsub.rb @@ -16,6 +16,7 @@ class GcloudPubSubOutput < BufferedOutput config_param :topic, :string, :default => nil config_param :key, :string, :default => nil config_param :autocreate_topic, :bool, :default => false + config_param :max_messages, :integer, :default => 1000 unless method_defined?(:log) define_method("log") { $log } @@ -50,10 +51,8 @@ def write(chunk) end if messages.length > 0 - @client.publish do |batch| - messages.each do |m| - batch.publish m - end + messages.each_slice(@max_messages).each do |msg| + publish msg end end rescue => e @@ -61,5 +60,14 @@ def write(chunk) log.error_backtrace raise e end + + def publish(messages) + log.debug "send message topic:#{@client.name} length:#{messages.length.to_s}" + @client.publish do |batch| + messages.each do |m| + batch.publish m + end + end + end end end From 562d1adcc996005c97c625d410c8ce9b7fcc3e4c Mon Sep 17 00:00:00 2001 From: Yoshihiro MIYAI Date: Sat, 9 Jul 2016 11:39:36 +0900 Subject: [PATCH 2/2] Add max_messages tests --- test/plugin/test_out_gcloud_pubsub.rb | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/plugin/test_out_gcloud_pubsub.rb b/test/plugin/test_out_gcloud_pubsub.rb index d128fd9..0a009f7 100644 --- a/test/plugin/test_out_gcloud_pubsub.rb +++ b/test/plugin/test_out_gcloud_pubsub.rb @@ -33,6 +33,7 @@ def test_configure assert_equal('key-test', d.instance.key) assert_equal(false, d.instance.autocreate_topic) assert_equal(1, d.instance.flush_interval) + assert_equal(1000, d.instance.max_messages) end def test_autocreate_topic @@ -59,6 +60,27 @@ def test_autocreate_topic d.instance.start() end + def test_max_messages + d = create_driver(DEFAULT_CONFIG) + + client = mock! + client.name.times(2) { 'topic-test' } + client.publish.times(2) + + pubsub_mock = mock!.topic(anything, anything) { client } + gcloud_mock = mock!.pubsub { pubsub_mock } + stub(Gcloud).new { gcloud_mock } + + time = Time.parse("2016-07-09 11:12:13 UTC").to_i + + # max_messages is default 1000 + 1001.times do |i| + d.emit({"a" => i}, time) + end + + d.run + end + def test_re_raise_errors d = create_driver(DEFAULT_CONFIG) chunk = Fluent::MemoryBufferChunk.new('key', 'data') @@ -66,6 +88,9 @@ def test_re_raise_errors def client.publish raise ReRaisedError end + def client.name + 'test-topic' + end d.instance.instance_variable_set(:@client, client) assert_raises ReRaisedError do