lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-2.0.2 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-2.0.3
- old
+ new
@@ -58,11 +58,11 @@
# the id in the event.
#
# It might be important to note, with regards to metadata, that if you're
# ingesting documents with the intent to re-index them (or just update them)
# that the `action` option in the elasticsearch output want's to know how to
- # handle those things. It can be dynamically assigned with a field
+ # handle those things. It can be dynamically assigned with a field
# added to the metadata.
#
# Example
# [source, ruby]
# input {
@@ -84,11 +84,11 @@
# }
#
config :docinfo, :validate => :boolean, :default => false
# Where to move the Elasticsearch document information by default we use the @metadata field.
- config :docinfo_target, :validate=> :string, :default => "@metadata"
+ config :docinfo_target, :validate=> :string, :default => LogStash::Event::METADATA
# List of document metadata to move to the `docinfo_target` field
# To learn more about Elasticsearch metadata fields read
# http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_document_metadata.html
config :docinfo_fields, :validate => :array, :default => ['_index', '_type', '_id']
@@ -103,11 +103,10 @@
config :ssl, :validate => :boolean, :default => false
# SSL Certificate Authority file
config :ca_file, :validate => :path
- public
def register
require "elasticsearch"
@options = {
:index => @index,
@@ -139,13 +138,11 @@
end
@client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options)
end
- public
def run(output_queue)
-
# get first wave of data
r = @client.search(@options)
# since 'scan' doesn't return data on the search call, do an extra scroll
if @scan
@@ -158,40 +155,44 @@
while has_hits && !stop?
r = process_next_scroll(output_queue, r['_scroll_id'])
has_hits = r['has_hits']
end
- end # def run
+ end
private
+
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
{'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']}
end
- private
def push_hit(hit, output_queue)
event = LogStash::Event.new(hit['_source'])
decorate(event)
if @docinfo
- event[@docinfo_target] ||= {}
+ # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
+ docinfo_target = event[@docinfo_target] || {}
- unless event[@docinfo_target].is_a?(Hash)
- @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the `@metadata` field in the `_source` document, expected a hash got:", :metadata_type => event[@docinfo_target].class)
+ unless docinfo_target.is_a?(Hash)
+ @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event)
+ # TODO: (colin) I am not sure raising is a good strategy here?
raise Exception.new("Elasticsearch input: incompatible event")
end
@docinfo_fields.each do |field|
- event[@docinfo_target][field] = hit[field]
+ docinfo_target[field] = hit[field]
end
+
+ event[@docinfo_target] = docinfo_target
end
+
output_queue << event
end
- private
def scroll_request scroll_id
@client.scroll(:body => scroll_id, :scroll => @scroll)
end
-end # class LogStash::Inputs::Elasticsearch
+end