lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.9.3 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.10.0
- old
+ new
@@ -2,10 +2,13 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/json"
require "logstash/util/safe_uri"
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
+require 'logstash/plugin_mixins/event_support/event_factory_adapter'
+require 'logstash/plugin_mixins/ecs_compatibility_support'
+require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require "base64"
require "elasticsearch"
require "elasticsearch/transport/transport/http/manticore"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
@@ -64,16 +67,20 @@
#
# Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
#
#
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
+
+ include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
+ include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
+
+ include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
+
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
config_name "elasticsearch"
- default :codec, "json"
-
# List of elasticsearch hosts to use for querying.
# Each host can be either IP, HOST, IP:port or HOST:port.
# Port defaults to 9200
config :hosts, :validate => :array
@@ -126,12 +133,13 @@
# }
# }
#
config :docinfo, :validate => :boolean, :default => false
- # Where to move the Elasticsearch document information. By default we use the @metadata field.
- config :docinfo_target, :validate=> :string, :default => LogStash::Event::METADATA
+ # Where to move the Elasticsearch document information.
+ # default: [@metadata][input][elasticsearch] in ECS mode, @metadata field otherwise
+ config :docinfo_target, :validate=> :field_reference
# List of document metadata to move to the `docinfo_target` field.
# To learn more about Elasticsearch metadata fields read
# http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_document_metadata.html
config :docinfo_fields, :validate => :array, :default => ['_index', '_type', '_id']
@@ -182,10 +190,18 @@
config :schedule, :validate => :string
# If set, the _source of each hit will be added nested under the target instead of at the top-level
config :target, :validate => :field_reference
+ def initialize(params={})
+ super(params)
+
+ if docinfo_target.nil?
+ @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
+ end
+ end
+
def register
require "rufus/scheduler"
@options = {
:index => @index,
@@ -295,51 +311,45 @@
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
[r['hits']['hits'].any?, r['_scroll_id']]
rescue => e
# this will typically be triggered by a scroll timeout
- logger.error("Scroll request error, aborting scroll", error: e.inspect)
+ logger.error("Scroll request error, aborting scroll", message: e.message, exception: e.class)
# return no hits and original scroll_id so we can try to clear it
[false, scroll_id]
end
def push_hit(hit, output_queue)
- if @target.nil?
- event = LogStash::Event.new(hit['_source'])
- else
- event = LogStash::Event.new
- event.set(@target, hit['_source'])
- end
+ event = targeted_event_factory.new_event hit['_source']
+ set_docinfo_fields(hit, event) if @docinfo
+ decorate(event)
+ output_queue << event
+ end
- if @docinfo
- # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
- docinfo_target = event.get(@docinfo_target) || {}
+ def set_docinfo_fields(hit, event)
+ # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
+ docinfo_target = event.get(@docinfo_target) || {}
- unless docinfo_target.is_a?(Hash)
- @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event)
+ unless docinfo_target.is_a?(Hash)
+ @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata)
- # TODO: (colin) I am not sure raising is a good strategy here?
- raise Exception.new("Elasticsearch input: incompatible event")
- end
+ # TODO: (colin) I am not sure raising is a good strategy here?
+ raise Exception.new("Elasticsearch input: incompatible event")
+ end
- @docinfo_fields.each do |field|
- docinfo_target[field] = hit[field]
- end
-
- event.set(@docinfo_target, docinfo_target)
+ @docinfo_fields.each do |field|
+ docinfo_target[field] = hit[field]
end
- decorate(event)
-
- output_queue << event
+ event.set(@docinfo_target, docinfo_target)
end
def clear_scroll(scroll_id)
@client.clear_scroll(scroll_id: scroll_id) if scroll_id
rescue => e
# ignore & log any clear_scroll errors
- logger.warn("Ignoring clear_scroll exception", message: e.message)
+ logger.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end
def scroll_request scroll_id
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
@@ -444,9 +454,12 @@
rescue ArgumentError => e
raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth')
end
[ cloud_auth.username, cloud_auth.password ]
end
+
+ # @private used by unit specs
+ attr_reader :client
module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
# @param value [Array<Object>]