lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.14.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.15.0

- old
+ new

@@ -8,10 +8,11 @@ require 'logstash/plugin_mixins/ecs_compatibility_support' require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' require 'logstash/plugin_mixins/ca_trusted_fingerprint_support' require "logstash/plugin_mixins/scheduler" require "base64" +require 'logstash/helpers/loggable_try' require "elasticsearch" require "elasticsearch/transport/transport/http/manticore" require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore" require_relative "elasticsearch/patches/_elasticsearch_transport_connections_selector" @@ -97,10 +98,13 @@ config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }' # This allows you to set the maximum number of hits returned per scroll. config :size, :validate => :number, :default => 1000 + # The number of retries to run the query. If the query fails after all retries, it logs an error message. + config :retries, :validate => :number, :default => 0 + # This parameter controls the keepalive time in seconds of the scrolling # request and initiates the scrolling process. The timeout applies per # round trip (i.e. between the previous scroll request, to the next). config :scroll, :validate => :string, :default => "1m" @@ -219,10 +223,12 @@ if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end + @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") + validate_authentication fill_user_password_from_cloud_auth fill_hosts_from_cloud_id @@ -260,26 +266,71 @@ do_run(output_queue) end end private - + JOB_NAME = "run query" def do_run(output_queue) # if configured to run a single slice, don't bother spinning up threads - return do_run_slice(output_queue) if @slices.nil? || @slices <= 1 + if @slices.nil? || @slices <= 1 + success, events = retryable_slice + success && events.each { |event| output_queue << event } + return + end logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8 + slice_results = parallel_slice # array of tuple(ok, events) + + # insert events to queue if all slices success + if slice_results.all?(&:first) + slice_results.flat_map { |success, events| events } + .each { |event| output_queue << event } + end + + logger.trace("#{@slices} slices completed") + end + + def retryable(job_name, &block) + begin + stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) + output = stud_try.try((@retries + 1).times) { yield } + [true, output] + rescue => e + error_details = {:message => e.message, :cause => e.cause} + error_details[:backtrace] = e.backtrace if logger.debug? + logger.error("Tried #{job_name} unsuccessfully", error_details) + [false, nil] + end + end + + + # @return [(ok, events)] : Array of tuple(Boolean, [Logstash::Event]) + def parallel_slice pipeline_id = execution_context&.pipeline_id || 'main' @slices.times.map do |slice_id| Thread.new do LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}") - do_run_slice(output_queue, slice_id) + retryable_slice(slice_id) end - end.map(&:join) + end.map do |t| + t.join + t.value + end end + # @param scroll_id [Integer] + # @return (ok, events) [Boolean, Array(Logstash::Event)] + def retryable_slice(slice_id=nil) + retryable(JOB_NAME) do + output = [] + do_run_slice(output, slice_id) + output + end + end + + def do_run_slice(output_queue, slice_id=nil) slice_query = @base_query slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil? slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) ) @@ -312,15 +363,10 @@ # def process_next_scroll(output_queue, scroll_id) r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } [r['hits']['hits'].any?, r['_scroll_id']] - rescue => e - # this will typically be triggered by a scroll timeout - logger.error("Scroll request error, aborting scroll", message: e.message, exception: e.class) - # return no hits and original scroll_id so we can try to clear it - [false, scroll_id] end def push_hit(hit, output_queue) event = targeted_event_factory.new_event hit['_source'] set_docinfo_fields(hit, event) if @docinfo @@ -351,10 +397,10 @@ rescue => e # ignore & log any clear_scroll errors logger.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class) end - def scroll_request scroll_id + def scroll_request(scroll_id) @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end def search_request(options) @client.search(options)