lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.17.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.17.1
- old
+ new
@@ -254,10 +254,12 @@
config :target, :validate => :field_reference
# config :ca_trusted_fingerprint, :validate => :sha_256_hex
include LogStash::PluginMixins::CATrustedFingerprintSupport
+ attr_reader :pipeline_id
+
def initialize(params={})
super(params)
if docinfo_target.nil?
@docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
@@ -265,10 +267,12 @@
end
def register
require "rufus/scheduler"
+ @pipeline_id = execution_context&.pipeline_id || 'main'
+
fill_hosts_from_cloud_id
setup_ssl_params!
@options = {
:index => @index,
@@ -324,65 +328,39 @@
private
JOB_NAME = "run query"
def do_run(output_queue)
# if configured to run a single slice, don't bother spinning up threads
if @slices.nil? || @slices <= 1
- success, events = retryable_slice
- success && events.each { |event| output_queue << event }
- return
+ return retryable(JOB_NAME) do
+ do_run_slice(output_queue)
+ end
end
logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8
- slice_results = parallel_slice # array of tuple(ok, events)
- # insert events to queue if all slices success
- if slice_results.all?(&:first)
- slice_results.flat_map { |success, events| events }
- .each { |event| output_queue << event }
- end
+ @slices.times.map do |slice_id|
+ Thread.new do
+ LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
+ retryable(JOB_NAME) do
+ do_run_slice(output_queue, slice_id)
+ end
+ end
+ end.map(&:join)
logger.trace("#{@slices} slices completed")
end
def retryable(job_name, &block)
begin
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
- output = stud_try.try((@retries + 1).times) { yield }
- [true, output]
+ stud_try.try((@retries + 1).times) { yield }
rescue => e
error_details = {:message => e.message, :cause => e.cause}
error_details[:backtrace] = e.backtrace if logger.debug?
logger.error("Tried #{job_name} unsuccessfully", error_details)
- [false, nil]
end
end
-
-
- # @return [(ok, events)] : Array of tuple(Boolean, [Logstash::Event])
- def parallel_slice
- pipeline_id = execution_context&.pipeline_id || 'main'
- @slices.times.map do |slice_id|
- Thread.new do
- LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
- retryable_slice(slice_id)
- end
- end.map do |t|
- t.join
- t.value
- end
- end
-
- # @param scroll_id [Integer]
- # @return (ok, events) [Boolean, Array(Logstash::Event)]
- def retryable_slice(slice_id=nil)
- retryable(JOB_NAME) do
- output = []
- do_run_slice(output, slice_id)
- output
- end
- end
-
def do_run_slice(output_queue, slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?