lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-1.0.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-1.0.2

- old
+ new

@@ -139,50 +139,57 @@ end @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options) end - private - def run_next(output_queue, scroll_id) - r = scroll_request(scroll_id) - r['hits']['hits'].each do |hit| - event = LogStash::Event.new(hit['_source']) - decorate(event) - - if @docinfo - event[@docinfo_target] ||= {} - - unless event[@docinfo_target].is_a?(Hash) - @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class) - - raise Exception.new("Elasticsearch input: incompatible event") - end - - @docinfo_fields.each do |field| - event[@docinfo_target][field] = hit[field] - end - end - output_queue << event - end - - {:has_hits => r['hits']['hits'].any?, :scroll_id => r['_scroll_id']} - end - public def run(output_queue) # get first wave of data r = @client.search(@options) # since 'scan' doesn't return data on the search call, do an extra scroll if @scan - resp = run_next(output_queue, r['_scroll_id']) + r = process_next_scroll(output_queue, r['_scroll_id']) + has_hits = r['has_hits'] + else # not a scan, process the response + r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } + has_hits = r['hits']['hits'].any? end - while resp[:has_hits] do - resp = run_next(output_queue, resp[:scroll_id]) + while has_hits do + r = process_next_scroll(output_queue, r['_scroll_id']) + has_hits = r['has_hits'] end end # def run + + private + def process_next_scroll(output_queue, scroll_id) + r = scroll_request(scroll_id) + r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } + {'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']} + end + + private + def push_hit(hit, output_queue) + event = LogStash::Event.new(hit['_source']) + decorate(event) + + if @docinfo + event[@docinfo_target] ||= {} + + unless event[@docinfo_target].is_a?(Hash) + @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class) + + raise Exception.new("Elasticsearch input: incompatible event") + end + + @docinfo_fields.each do |field| + event[@docinfo_target][field] = hit[field] + end + end + output_queue << event + end private def scroll_request scroll_id @client.scroll(:body => scroll_id, :scroll => @scroll) end