lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.2.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.3.0
- old
+ new
@@ -1,8 +1,9 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
+require "logstash/json"
require "base64"
# .Compatibility Note
# [NOTE]
# ================================================================================
@@ -81,10 +82,14 @@
# This parameter controls the keepalive time in seconds of the scrolling
# request and initiates the scrolling process. The timeout applies per
# round trip (i.e. between the previous scroll request, to the next).
config :scroll, :validate => :string, :default => "1m"
+ # This parameter controls the number of parallel slices to be consumed simultaneously
+ # by this pipeline input.
+ config :slices, :validate => :number
+
# If set, include Elasticsearch document information such as index, type, and
# the id in the event.
#
# It might be important to note, with regards to metadata, that if you're
# ingesting documents with the intent to re-index them (or just update them)
@@ -145,14 +150,18 @@
require "elasticsearch"
require "rufus/scheduler"
@options = {
:index => @index,
- :body => @query,
:scroll => @scroll,
:size => @size
}
+ @base_query = LogStash::Json.load(@query)
+ if @slices
+ @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
+ @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
+ end
transport_options = {}
if @user && @password
token = Base64.strict_encode64("#{@user}:#{@password.value}")
@@ -194,20 +203,43 @@
end
private
def do_run(output_queue)
- # get first wave of data
- r = @client.search(@options)
+ # if configured to run a single slice, don't bother spinning up threads
+ return do_run_slice(output_queue) if @slices.nil? || @slices <= 1
+ logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8
+
+ @slices.times.map do |slice_id|
+ Thread.new do
+ LogStash::Util::set_thread_name("#{@id}_slice_#{slice_id}")
+ do_run_slice(output_queue, slice_id)
+ end
+ end.map(&:join)
+ end
+
+ def do_run_slice(output_queue, slice_id=nil)
+ slice_query = @base_query
+ slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?
+
+ slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) )
+
+ logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil?
+ r = search_request(slice_options)
+
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
+ logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil?
+
has_hits = r['hits']['hits'].any?
- while has_hits && !stop?
+ while has_hits && r['_scroll_id'] && !stop?
r = process_next_scroll(output_queue, r['_scroll_id'])
+ logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil?
has_hits = r['has_hits']
end
+ logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil?
end
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
@@ -240,7 +272,11 @@
output_queue << event
end
def scroll_request scroll_id
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
+ end
+
+ def search_request(options)
+ @client.search(options)
end
end