lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.9.3 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.10.0

- old
+ new

@@ -2,10 +2,13 @@ require "logstash/inputs/base" require "logstash/namespace" require "logstash/json" require "logstash/util/safe_uri" require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' +require 'logstash/plugin_mixins/event_support/event_factory_adapter' +require 'logstash/plugin_mixins/ecs_compatibility_support' +require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' require "base64" require "elasticsearch" require "elasticsearch/transport/transport/http/manticore" require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore" @@ -64,16 +67,20 @@ # # Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here]. # # class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base + + include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) + include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck + + include LogStash::PluginMixins::EventSupport::EventFactoryAdapter + extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter config_name "elasticsearch" - default :codec, "json" - # List of elasticsearch hosts to use for querying. # Each host can be either IP, HOST, IP:port or HOST:port. # Port defaults to 9200 config :hosts, :validate => :array @@ -126,12 +133,13 @@ # } # } # config :docinfo, :validate => :boolean, :default => false - # Where to move the Elasticsearch document information. By default we use the @metadata field. - config :docinfo_target, :validate=> :string, :default => LogStash::Event::METADATA + # Where to move the Elasticsearch document information. + # default: [@metadata][input][elasticsearch] in ECS mode, @metadata field otherwise + config :docinfo_target, :validate=> :field_reference # List of document metadata to move to the `docinfo_target` field. # To learn more about Elasticsearch metadata fields read # http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_document_metadata.html config :docinfo_fields, :validate => :array, :default => ['_index', '_type', '_id'] @@ -182,10 +190,18 @@ config :schedule, :validate => :string # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference + def initialize(params={}) + super(params) + + if docinfo_target.nil? + @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] + end + end + def register require "rufus/scheduler" @options = { :index => @index, @@ -295,51 +311,45 @@ r = scroll_request(scroll_id) r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } [r['hits']['hits'].any?, r['_scroll_id']] rescue => e # this will typically be triggered by a scroll timeout - logger.error("Scroll request error, aborting scroll", error: e.inspect) + logger.error("Scroll request error, aborting scroll", message: e.message, exception: e.class) # return no hits and original scroll_id so we can try to clear it [false, scroll_id] end def push_hit(hit, output_queue) - if @target.nil? - event = LogStash::Event.new(hit['_source']) - else - event = LogStash::Event.new - event.set(@target, hit['_source']) - end + event = targeted_event_factory.new_event hit['_source'] + set_docinfo_fields(hit, event) if @docinfo + decorate(event) + output_queue << event + end - if @docinfo - # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. - docinfo_target = event.get(@docinfo_target) || {} + def set_docinfo_fields(hit, event) + # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. + docinfo_target = event.get(@docinfo_target) || {} - unless docinfo_target.is_a?(Hash) - @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event) + unless docinfo_target.is_a?(Hash) + @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata) - # TODO: (colin) I am not sure raising is a good strategy here? - raise Exception.new("Elasticsearch input: incompatible event") - end + # TODO: (colin) I am not sure raising is a good strategy here? + raise Exception.new("Elasticsearch input: incompatible event") + end - @docinfo_fields.each do |field| - docinfo_target[field] = hit[field] - end - - event.set(@docinfo_target, docinfo_target) + @docinfo_fields.each do |field| + docinfo_target[field] = hit[field] end - decorate(event) - - output_queue << event + event.set(@docinfo_target, docinfo_target) end def clear_scroll(scroll_id) @client.clear_scroll(scroll_id: scroll_id) if scroll_id rescue => e # ignore & log any clear_scroll errors - logger.warn("Ignoring clear_scroll exception", message: e.message) + logger.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class) end def scroll_request scroll_id @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end @@ -444,9 +454,12 @@ rescue ArgumentError => e raise LogStash::ConfigurationError, e.message.to_s.sub(/Cloud Auth/i, 'cloud_auth') end [ cloud_auth.username, cloud_auth.password ] end + + # @private used by unit specs + attr_reader :client module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator # @param value [Array<Object>]