lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-2.0.2 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-2.0.3

- old
+ new

@@ -58,11 +58,11 @@ # the id in the event. # # It might be important to note, with regards to metadata, that if you're # ingesting documents with the intent to re-index them (or just update them) # that the `action` option in the elasticsearch output want's to know how to - # handle those things. It can be dynamically assigned with a field + # handle those things. It can be dynamically assigned with a field # added to the metadata. # # Example # [source, ruby] # input { @@ -84,11 +84,11 @@ # } # config :docinfo, :validate => :boolean, :default => false # Where to move the Elasticsearch document information by default we use the @metadata field. - config :docinfo_target, :validate=> :string, :default => "@metadata" + config :docinfo_target, :validate=> :string, :default => LogStash::Event::METADATA # List of document metadata to move to the `docinfo_target` field # To learn more about Elasticsearch metadata fields read # http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_document_metadata.html config :docinfo_fields, :validate => :array, :default => ['_index', '_type', '_id'] @@ -103,11 +103,10 @@ config :ssl, :validate => :boolean, :default => false # SSL Certificate Authority file config :ca_file, :validate => :path - public def register require "elasticsearch" @options = { :index => @index, @@ -139,13 +138,11 @@ end @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options) end - public def run(output_queue) - # get first wave of data r = @client.search(@options) # since 'scan' doesn't return data on the search call, do an extra scroll if @scan @@ -158,40 +155,44 @@ while has_hits && !stop? r = process_next_scroll(output_queue, r['_scroll_id']) has_hits = r['has_hits'] end - end # def run + end private + def process_next_scroll(output_queue, scroll_id) r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } {'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']} end - private def push_hit(hit, output_queue) event = LogStash::Event.new(hit['_source']) decorate(event) if @docinfo - event[@docinfo_target] ||= {} + # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. + docinfo_target = event[@docinfo_target] || {} - unless event[@docinfo_target].is_a?(Hash) - @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class) + unless docinfo_target.is_a?(Hash) + @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event) + # TODO: (colin) I am not sure raising is a good strategy here? raise Exception.new("Elasticsearch input: incompatible event") end @docinfo_fields.each do |field| - event[@docinfo_target][field] = hit[field] + docinfo_target[field] = hit[field] end + + event[@docinfo_target] = docinfo_target end + output_queue << event end - private def scroll_request scroll_id @client.scroll(:body => scroll_id, :scroll => @scroll) end -end # class LogStash::Inputs::Elasticsearch +end