Skip to content

Commit 7805ffa

Browse files
committed
fix: add es7-8 compat
Signed-off-by: Bence Csati <bence.csati@axoflow.com>
1 parent 5a10d6e commit 7805ffa

File tree

1 file changed

+32
-8
lines changed

1 file changed

+32
-8
lines changed

lib/fluent/plugin/out_elasticsearch.rb

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ def initialize(retry_stream)
178178
config_param :use_legacy_template, :bool, :default => true
179179
config_param :catch_transport_exception_on_retry, :bool, :default => true
180180
config_param :target_index_affinity, :bool, :default => false
181+
config_param :force_content_type, :string, :default => nil,
182+
:desc => "Force specific Content-Type header (e.g., 'application/json' or 'application/x-ndjson'). " \
183+
"Overrides automatic version detection. Useful for mixed ES environments."
184+
config_param :ignore_version_content_type_mismatch, :bool, :default => false,
185+
:desc => "Automatically fallback to application/json if Content-Type version mismatch occurs. " \
186+
"Enables seamless operation across mixed ES 7/8/9 environments."
181187

182188
config_section :metadata, param_name: :metainfo, multi: false do
183189
config_param :include_chunk_id, :bool, :default => false
@@ -362,14 +368,31 @@ class << self
362368
@type_name = nil
363369
end
364370
@accept_type = nil
365-
if @content_type != ES9_CONTENT_TYPE
371+
372+
# Only set ES9 content type if not overridden and mismatch handling is not enabled
373+
if @content_type.to_s != ES9_CONTENT_TYPE && !@ignore_version_content_type_mismatch
366374
log.trace "Detected ES 9.x or above: Content-Type will be adjusted."
367375
@content_type = ES9_CONTENT_TYPE
368376
@accept_type = ES9_CONTENT_TYPE
377+
elsif @ignore_version_content_type_mismatch
378+
log.info "Ignoring ES version for Content-Type, using application/json for compatibility"
379+
@content_type = :'application/json'
380+
@accept_type = nil
369381
end
370382
end
371383
end
372384

385+
if @content_type.nil?
386+
log.warn "content_type was nil, defaulting to application/json"
387+
@content_type = :'application/json'
388+
end
389+
390+
if @force_content_type
391+
log.info "Forcing Content-Type to: #{@force_content_type}"
392+
@content_type = @force_content_type
393+
@accept_type = nil
394+
end
395+
373396
if @validate_client_version && !dry_run?
374397
if @last_seen_major_version != client_library_version.to_i
375398
raise Fluent::ConfigError, <<-EOC
@@ -624,19 +647,20 @@ def client(host = nil, compress_connection = false)
624647
else
625648
{}
626649
end
627-
headers = { 'Content-Type' => @content_type.to_s }
650+
651+
content_type_value = @content_type ? @content_type.to_s : 'application/json'
652+
accept_type_value = @accept_type ? @accept_type.to_s : nil
653+
content_type_value = 'application/json' if content_type_value.strip.empty?
654+
655+
headers = { 'Content-Type' => content_type_value }
628656
.merge(@custom_headers)
629657
.merge(@api_key_header)
630658
.merge(gzip_headers)
631-
headers.merge!('Accept' => @accept_type) if @accept_type
659+
660+
headers.merge!('Accept' => accept_type_value) if accept_type_value && !accept_type_value.strip.empty?
632661

633662
ssl_options = { verify: @ssl_verify, ca_file: @ca_file}.merge(@ssl_version_options)
634663

635-
transport_options_hash = {
636-
headers: headers || { 'Content-Type' => 'application/json' },
637-
request: { timeout: @request_timeout },
638-
ssl: ssl_options,
639-
}
640664
transport = TRANSPORT_CLASS::Transport::HTTP::Faraday.new(connection_options.merge(
641665
options: {
642666
reload_connections: local_reload_connections,

0 commit comments

Comments
 (0)