lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.17.2 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.18.0
- old
+ new
@@ -256,10 +256,13 @@
# config :ca_trusted_fingerprint, :validate => :sha_256_hex
include LogStash::PluginMixins::CATrustedFingerprintSupport
attr_reader :pipeline_id
+ BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
+ DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
+
def initialize(params={})
super(params)
if docinfo_target.nil?
@docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
@@ -303,17 +306,23 @@
@logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('')
- @client = Elasticsearch::Client.new(
+ @client_options = {
:hosts => hosts,
:transport_options => transport_options,
:transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
:ssl => ssl_options
- )
+ }
+
+ @client = Elasticsearch::Client.new(@client_options)
+
test_connection!
+
+ setup_serverless
+
@client
end
def run(output_queue)
@@ -433,11 +442,11 @@
def scroll_request(scroll_id)
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
- def search_request(options)
+ def search_request(options={})
@client.search(options)
end
def hosts_default?(hosts)
hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
@@ -664,9 +673,30 @@
def test_connection!
@client.ping
rescue Elasticsearch::UnsupportedProductError
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
+ end
+
+ # recreate client with default header when it is serverless
+ # verify the header by sending GET /
+ def setup_serverless
+ if serverless?
+ @client_options[:transport_options][:headers].merge!(DEFAULT_EAV_HEADER)
+ @client = Elasticsearch::Client.new(@client_options)
+ @client.info
+ end
+ rescue => e
+ @logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace)
+ raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
+ end
+
+ def build_flavor
+ @build_flavor ||= @client.info&.dig('version', 'build_flavor')
+ end
+
+ def serverless?
+ @is_serverless ||= (build_flavor == BUILD_FLAVOR_SERVERLESS)
end
module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator