From 9370ba03f8e4545f748098203763aa6ab187b4f6 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 29 Jan 2026 05:21:05 +0000 Subject: [PATCH] format: add support for `ndjson_batch` --- CHANGELOG.md | 3 +++ docs/index.asciidoc | 29 +++++++++++++++-------------- lib/logstash/outputs/http.rb | 7 +++++-- logstash-output-http.gemspec | 2 +- spec/outputs/http_spec.rb | 13 +++++++++++++ 5 files changed, 37 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f614c8f..36b3aad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 6.1.0 + - Added support for batched newline-delimited json with `format => ndjson_batch` + ## 6.0.0 - SSL settings that were marked deprecated in version `5.6.0` are now marked obsolete, and will prevent the plugin from starting. - These settings are: diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 78242ca..48237a8 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -146,11 +146,7 @@ Timeout (in seconds) to wait for a connection to be established. Default is `10s Content type -If not specified, this defaults to the following: - -* if format is "json", "application/json" -* if format is "json_batch", "application/json". Each Logstash batch of events will be concatenated into a single array and sent in one request. -* if format is "form", "application/x-www-form-urlencoded" +If not specified, the default value is determined by the <>. [id="plugins-{type}s-{plugin}-cookies"] ===== `cookies` @@ -172,21 +168,25 @@ Should redirects be followed? Defaults to `true` [id="plugins-{type}s-{plugin}-format"] ===== `format` - * Value can be any of: `json`, `json_batch`, `form`, `message` + * Value can be any of: `json`, `ndjson_batch`, `json_batch`, `form`, `message` * Default value is `"json"` Set the format of the http body. -If json_batch, each batch of events received by this output will be placed -into a single JSON array and sent in one request. This is particularly useful -for high throughput scenarios such as sending data between Logstash instaces. - -If form, then the body will be the mapping (or whole event) converted -into a query parameter string, e.g. `foo=bar&baz=fizz...` +When set to a batch-oriented `format`, each batch of events received by this output will be transmitted in a single http request. +This is particularly useful for high throughput scenarios such as sending data between Logstash instances. -If message, then the body will be the result of formatting the event according to message +Non-batch formats send one http request per event, which can have significantly lower throughput. -Otherwise, the event is sent as json. +[cols="<,<,<,<",options="header",] +|======================================================================= +|Format |Batch?|Body|Default <> +|`json_batch`| ✅YES | A JSON-encoded array of events | `application/json` +|`ndjson_batch`| ✅YES | A sequence of newline-delimited json-encoded events | `application/x-ndjson` +|`form`|️️ ⚠️NO | The mapping (or whole event) converted into a query parameter string (e.g. `foo=bar&baz=fizz...`) | `application/x-www-form-urlencoded` +|`message`| ⚠️NO | The result of formatting the event according to <> | `text/plain` +|`json`| ⚠️NO | The result of json-encoding the event | `application/json` +|======================================================================= [id="plugins-{type}s-{plugin}-headers"] ===== `headers` @@ -252,6 +252,7 @@ For example: * Value type is <> * There is no default value for this setting. + * This setting is ignored unless <> is set to `message` diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 5d81698..986f236 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -88,7 +88,7 @@ class PluginInternalQueueLeftoverError < StandardError; end # If message, then the body will be the result of formatting the event according to message # # Otherwise, the event is sent as json. - config :format, :validate => ["json", "json_batch", "form", "message"], :default => "json" + config :format, :validate => ["json", "json_batch", "ndjson_batch", "form", "message"], :default => "json" # Set this to true if you want to enable gzip compression for your http requests config :http_compression, :validate => :boolean, :default => false @@ -112,11 +112,12 @@ def register when "form" ; @content_type = "application/x-www-form-urlencoded" when "json" ; @content_type = "application/json" when "json_batch" ; @content_type = "application/json" + when "ndjson_batch"; @content_type = "application/x-ndjson" when "message" ; @content_type = "text/plain" end end - @is_batch = @format == "json_batch" + @is_batch = %w(json_batch ndjson_batch).include?(@format) @headers["Content-Type"] = @content_type @@ -336,6 +337,8 @@ def event_body(event) event.sprintf(@message) elsif @format == "json_batch" LogStash::Json.dump(event.map {|e| map_event(e) }) + elsif @format == "ndjson_batch" + event.map {|e| LogStash::Json.dump(map_event(e)) + "\n" }.join else encode(map_event(event)) end diff --git a/logstash-output-http.gemspec b/logstash-output-http.gemspec index d10ce6d..225ee20 100644 --- a/logstash-output-http.gemspec +++ b/logstash-output-http.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-http' - s.version = '6.0.0' + s.version = '6.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Sends events to a generic HTTP or HTTPS endpoint" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/outputs/http_spec.rb b/spec/outputs/http_spec.rb index adceccd..d8d9e1f 100644 --- a/spec/outputs/http_spec.rb +++ b/spec/outputs/http_spec.rb @@ -349,6 +349,19 @@ end + describe "sending the batch as NDJSON" do + let(:config) do + base_config.merge({"url" => url, "http_method" => "post", "format" => "ndjson_batch"}) + end + + let(:expected_body) { events.map {|e| ::LogStash::Json.dump(e) }.join("\n") + "\n" } + let(:events) { [::LogStash::Event.new("a" => 1), ::LogStash::Event.new("b" => 2)]} + let(:expected_content_type) { "application/x-ndjson" } + + include_examples("a received event") + + end + describe "sending the event as a form" do let(:config) { base_config.merge({"url" => url, "http_method" => "post", "pool_max" => 1, "format" => "form"})