lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.17.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.17.1

- old
+ new

@@ -254,10 +254,12 @@ config :target, :validate => :field_reference # config :ca_trusted_fingerprint, :validate => :sha_256_hex include LogStash::PluginMixins::CATrustedFingerprintSupport + attr_reader :pipeline_id + def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] @@ -265,10 +267,12 @@ end def register require "rufus/scheduler" + @pipeline_id = execution_context&.pipeline_id || 'main' + fill_hosts_from_cloud_id setup_ssl_params! @options = { :index => @index, @@ -324,65 +328,39 @@ private JOB_NAME = "run query" def do_run(output_queue) # if configured to run a single slice, don't bother spinning up threads if @slices.nil? || @slices <= 1 - success, events = retryable_slice - success && events.each { |event| output_queue << event } - return + return retryable(JOB_NAME) do + do_run_slice(output_queue) + end end logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8 - slice_results = parallel_slice # array of tuple(ok, events) - # insert events to queue if all slices success - if slice_results.all?(&:first) - slice_results.flat_map { |success, events| events } - .each { |event| output_queue << event } - end + @slices.times.map do |slice_id| + Thread.new do + LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}") + retryable(JOB_NAME) do + do_run_slice(output_queue, slice_id) + end + end + end.map(&:join) logger.trace("#{@slices} slices completed") end def retryable(job_name, &block) begin stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) - output = stud_try.try((@retries + 1).times) { yield } - [true, output] + stud_try.try((@retries + 1).times) { yield } rescue => e error_details = {:message => e.message, :cause => e.cause} error_details[:backtrace] = e.backtrace if logger.debug? logger.error("Tried #{job_name} unsuccessfully", error_details) - [false, nil] end end - - - # @return [(ok, events)] : Array of tuple(Boolean, [Logstash::Event]) - def parallel_slice - pipeline_id = execution_context&.pipeline_id || 'main' - @slices.times.map do |slice_id| - Thread.new do - LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}") - retryable_slice(slice_id) - end - end.map do |t| - t.join - t.value - end - end - - # @param scroll_id [Integer] - # @return (ok, events) [Boolean, Array(Logstash::Event)] - def retryable_slice(slice_id=nil) - retryable(JOB_NAME) do - output = [] - do_run_slice(output, slice_id) - output - end - end - def do_run_slice(output_queue, slice_id=nil) slice_query = @base_query slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?