lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.14.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.15.0
- old
+ new
@@ -8,10 +8,11 @@
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
require "logstash/plugin_mixins/scheduler"
require "base64"
+require 'logstash/helpers/loggable_try'
require "elasticsearch"
require "elasticsearch/transport/transport/http/manticore"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
require_relative "elasticsearch/patches/_elasticsearch_transport_connections_selector"
@@ -97,10 +98,13 @@
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
+ # The number of retries to run the query. If the query fails after all retries, it logs an error message.
+ config :retries, :validate => :number, :default => 0
+
# This parameter controls the keepalive time in seconds of the scrolling
# request and initiates the scrolling process. The timeout applies per
# round trip (i.e. between the previous scroll request, to the next).
config :scroll, :validate => :string, :default => "1m"
@@ -219,10 +223,12 @@
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
+ @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
+
validate_authentication
fill_user_password_from_cloud_auth
fill_hosts_from_cloud_id
@@ -260,26 +266,71 @@
do_run(output_queue)
end
end
private
-
+ JOB_NAME = "run query"
def do_run(output_queue)
# if configured to run a single slice, don't bother spinning up threads
- return do_run_slice(output_queue) if @slices.nil? || @slices <= 1
+ if @slices.nil? || @slices <= 1
+ success, events = retryable_slice
+ success && events.each { |event| output_queue << event }
+ return
+ end
logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8
+ slice_results = parallel_slice # array of tuple(ok, events)
+
+ # insert events to queue if all slices success
+ if slice_results.all?(&:first)
+ slice_results.flat_map { |success, events| events }
+ .each { |event| output_queue << event }
+ end
+
+ logger.trace("#{@slices} slices completed")
+ end
+
+ def retryable(job_name, &block)
+ begin
+ stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
+ output = stud_try.try((@retries + 1).times) { yield }
+ [true, output]
+ rescue => e
+ error_details = {:message => e.message, :cause => e.cause}
+ error_details[:backtrace] = e.backtrace if logger.debug?
+ logger.error("Tried #{job_name} unsuccessfully", error_details)
+ [false, nil]
+ end
+ end
+
+
+ # @return [(ok, events)] : Array of tuple(Boolean, [Logstash::Event])
+ def parallel_slice
pipeline_id = execution_context&.pipeline_id || 'main'
@slices.times.map do |slice_id|
Thread.new do
LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
- do_run_slice(output_queue, slice_id)
+ retryable_slice(slice_id)
end
- end.map(&:join)
+ end.map do |t|
+ t.join
+ t.value
+ end
end
+ # @param scroll_id [Integer]
+ # @return (ok, events) [Boolean, Array(Logstash::Event)]
+ def retryable_slice(slice_id=nil)
+ retryable(JOB_NAME) do
+ output = []
+ do_run_slice(output, slice_id)
+ output
+ end
+ end
+
+
def do_run_slice(output_queue, slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?
slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) )
@@ -312,15 +363,10 @@
#
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
[r['hits']['hits'].any?, r['_scroll_id']]
- rescue => e
- # this will typically be triggered by a scroll timeout
- logger.error("Scroll request error, aborting scroll", message: e.message, exception: e.class)
- # return no hits and original scroll_id so we can try to clear it
- [false, scroll_id]
end
def push_hit(hit, output_queue)
event = targeted_event_factory.new_event hit['_source']
set_docinfo_fields(hit, event) if @docinfo
@@ -351,10 +397,10 @@
rescue => e
# ignore & log any clear_scroll errors
logger.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end
- def scroll_request scroll_id
+ def scroll_request(scroll_id)
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
def search_request(options)
@client.search(options)