lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.2.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.3.0

- old
+ new

@@ -1,8 +1,9 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" +require "logstash/json" require "base64" # .Compatibility Note # [NOTE] # ================================================================================ @@ -81,10 +82,14 @@ # This parameter controls the keepalive time in seconds of the scrolling # request and initiates the scrolling process. The timeout applies per # round trip (i.e. between the previous scroll request, to the next). config :scroll, :validate => :string, :default => "1m" + # This parameter controls the number of parallel slices to be consumed simultaneously + # by this pipeline input. + config :slices, :validate => :number + # If set, include Elasticsearch document information such as index, type, and # the id in the event. # # It might be important to note, with regards to metadata, that if you're # ingesting documents with the intent to re-index them (or just update them) @@ -145,14 +150,18 @@ require "elasticsearch" require "rufus/scheduler" @options = { :index => @index, - :body => @query, :scroll => @scroll, :size => @size } + @base_query = LogStash::Json.load(@query) + if @slices + @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") + @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + end transport_options = {} if @user && @password token = Base64.strict_encode64("#{@user}:#{@password.value}") @@ -194,20 +203,43 @@ end private def do_run(output_queue) - # get first wave of data - r = @client.search(@options) + # if configured to run a single slice, don't bother spinning up threads + return do_run_slice(output_queue) if @slices.nil? || @slices <= 1 + logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8 + + @slices.times.map do |slice_id| + Thread.new do + LogStash::Util::set_thread_name("#{@id}_slice_#{slice_id}") + do_run_slice(output_queue, slice_id) + end + end.map(&:join) + end + + def do_run_slice(output_queue, slice_id=nil) + slice_query = @base_query + slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil? + + slice_options = @options.merge(:body => LogStash::Json.dump(slice_query) ) + + logger.info("Slice starting", slice_id: slice_id, slices: @slices) unless slice_id.nil? + r = search_request(slice_options) + r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } + logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? + has_hits = r['hits']['hits'].any? - while has_hits && !stop? + while has_hits && r['_scroll_id'] && !stop? r = process_next_scroll(output_queue, r['_scroll_id']) + logger.debug("Slice progress", slice_id: slice_id, slices: @slices) unless slice_id.nil? has_hits = r['has_hits'] end + logger.info("Slice complete", slice_id: slice_id, slices: @slices) unless slice_id.nil? end def process_next_scroll(output_queue, scroll_id) r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } @@ -240,7 +272,11 @@ output_queue << event end def scroll_request scroll_id @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) + end + + def search_request(options) + @client.search(options) end end