lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-1.0.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-1.0.2
- old
+ new
@@ -139,50 +139,57 @@
end
@client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options)
end
- private
- def run_next(output_queue, scroll_id)
- r = scroll_request(scroll_id)
- r['hits']['hits'].each do |hit|
- event = LogStash::Event.new(hit['_source'])
- decorate(event)
-
- if @docinfo
- event[@docinfo_target] ||= {}
-
- unless event[@docinfo_target].is_a?(Hash)
- @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class)
-
- raise Exception.new("Elasticsearch input: incompatible event")
- end
-
- @docinfo_fields.each do |field|
- event[@docinfo_target][field] = hit[field]
- end
- end
- output_queue << event
- end
-
- {:has_hits => r['hits']['hits'].any?, :scroll_id => r['_scroll_id']}
- end
-
public
def run(output_queue)
# get first wave of data
r = @client.search(@options)
# since 'scan' doesn't return data on the search call, do an extra scroll
if @scan
- resp = run_next(output_queue, r['_scroll_id'])
+ r = process_next_scroll(output_queue, r['_scroll_id'])
+ has_hits = r['has_hits']
+ else # not a scan, process the response
+ r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
+ has_hits = r['hits']['hits'].any?
end
- while resp[:has_hits] do
- resp = run_next(output_queue, resp[:scroll_id])
+ while has_hits do
+ r = process_next_scroll(output_queue, r['_scroll_id'])
+ has_hits = r['has_hits']
end
end # def run
+
+ private
+ def process_next_scroll(output_queue, scroll_id)
+ r = scroll_request(scroll_id)
+ r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
+ {'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']}
+ end
+
+ private
+ def push_hit(hit, output_queue)
+ event = LogStash::Event.new(hit['_source'])
+ decorate(event)
+
+ if @docinfo
+ event[@docinfo_target] ||= {}
+
+ unless event[@docinfo_target].is_a?(Hash)
+ @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class)
+
+ raise Exception.new("Elasticsearch input: incompatible event")
+ end
+
+ @docinfo_fields.each do |field|
+ event[@docinfo_target][field] = hit[field]
+ end
+ end
+ output_queue << event
+ end
private
def scroll_request scroll_id
@client.scroll(:body => scroll_id, :scroll => @scroll)
end