lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.17.2 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.18.0

- old
+ new

@@ -256,10 +256,13 @@ # config :ca_trusted_fingerprint, :validate => :sha_256_hex include LogStash::PluginMixins::CATrustedFingerprintSupport attr_reader :pipeline_id + BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze + DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze + def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] @@ -303,17 +306,23 @@ @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') - @client = Elasticsearch::Client.new( + @client_options = { :hosts => hosts, :transport_options => transport_options, :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, :ssl => ssl_options - ) + } + + @client = Elasticsearch::Client.new(@client_options) + test_connection! + + setup_serverless + @client end def run(output_queue) @@ -433,11 +442,11 @@ def scroll_request(scroll_id) @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end - def search_request(options) + def search_request(options={}) @client.search(options) end def hosts_default?(hosts) hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? ) @@ -664,9 +673,30 @@ def test_connection! @client.ping rescue Elasticsearch::UnsupportedProductError raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" + end + + # recreate client with default header when it is serverless + # verify the header by sending GET / + def setup_serverless + if serverless? + @client_options[:transport_options][:headers].merge!(DEFAULT_EAV_HEADER) + @client = Elasticsearch::Client.new(@client_options) + @client.info + end + rescue => e + @logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace) + raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" + end + + def build_flavor + @build_flavor ||= @client.info&.dig('version', 'build_flavor') + end + + def serverless? + @is_serverless ||= (build_flavor == BUILD_FLAVOR_SERVERLESS) end module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator