module LogStash module Outputs class ElasticSearch # DS specific behavior/configuration. module DataStreamSupport def self.included(base) # Defines whether data will be indexed into an Elasticsearch data stream, # `data_stream_*` settings will only be used if this setting is enabled! # This setting supports values `true`, `false`, and `auto`. # Defaults to `false` in Logstash 7.x and `auto` starting in Logstash 8.0. base.config :data_stream, :validate => ['true', 'false', 'auto'] base.config :data_stream_type, :validate => ['logs', 'metrics', 'synthetics'], :default => 'logs' base.config :data_stream_dataset, :validate => :dataset_identifier, :default => 'generic' base.config :data_stream_namespace, :validate => :namespace_identifier, :default => 'default' base.config :data_stream_sync_fields, :validate => :boolean, :default => true base.config :data_stream_auto_routing, :validate => :boolean, :default => true base.extend(Validator) end # @note assumes to be running AFTER {after_successful_connection} completed, due ES version checks def data_stream_config? @data_stream_config.nil? ? @data_stream_config = check_data_stream_config! : @data_stream_config end private def data_stream_name(event) data_stream = event.get('data_stream') return @index if !data_stream_auto_routing || !data_stream.is_a?(Hash) type = data_stream['type'] || data_stream_type dataset = data_stream['dataset'] || data_stream_dataset namespace = data_stream['namespace'] || data_stream_namespace "#{type}-#{dataset}-#{namespace}" end # @param params the user configuration for the ES output # @note LS initialized configuration (with filled defaults) won't detect as data-stream # compatible, only explicit (`original_params`) config should be tested. # @return [TrueClass|FalseClass] whether given configuration is data-stream compatible def check_data_stream_config!(params = original_params) data_stream_params = params.select { |name, _| name.start_with?('data_stream_') } # exclude data_stream => invalid_data_stream_params = invalid_data_stream_params(params) case data_stream_explicit_value when false if data_stream_params.any? @logger.error "Ambiguous configuration; data stream settings must not be present when data streams is disabled (caused by: `data_stream => false`)", data_stream_params raise LogStash::ConfigurationError, "Ambiguous configuration, please remove data stream specific settings: #{data_stream_params.keys}" end return false when true if invalid_data_stream_params.any? @logger.error "Invalid data stream configuration, following parameters are not supported:", invalid_data_stream_params raise LogStash::ConfigurationError, "Invalid data stream configuration: #{invalid_data_stream_params.keys}" end return true else use_data_stream = data_stream_default(data_stream_params, invalid_data_stream_params.empty?) if !use_data_stream && data_stream_params.any? # DS (auto) disabled but there's still some data-stream parameters (and no `data_stream => false`) @logger.error "Ambiguous configuration; data stream settings are present, but data streams are not enabled", data_stream_params raise LogStash::ConfigurationError, "Ambiguous configuration, please set data_stream => true " + "or remove data stream specific settings: #{data_stream_params.keys}" end use_data_stream end end def data_stream_explicit_value case @data_stream when 'true' return true when 'false' return false else return nil # 'auto' or not set by user end end def invalid_data_stream_params(params) shared_params = LogStash::PluginMixins::ElasticSearch::APIConfigs::CONFIG_PARAMS.keys.map(&:to_s) params.reject do |name, value| # NOTE: intentionally do not support explicit DS configuration like: # - `index => ...` identifier provided by data_stream_xxx settings # - `manage_template => false` implied by not setting the parameter case name when 'action' value == 'create' when 'routing', 'pipeline' true when 'data_stream' value.to_s == 'true' else name.start_with?('data_stream_') || shared_params.include?(name) || inherited_internal_config_param?(name) # 'id', 'enabled_metric' etc end end end def inherited_internal_config_param?(name) self.class.superclass.get_config.key?(name.to_s) # superclass -> LogStash::Outputs::Base end DATA_STREAMS_ORIGIN_ES_VERSION = '7.9.0' # @return [Gem::Version] if ES supports DS nil (or raise) otherwise def assert_es_version_supports_data_streams fail 'no last_es_version' unless last_es_version # assert - should not happen es_version = Gem::Version.create(last_es_version) if es_version < Gem::Version.create(DATA_STREAMS_ORIGIN_ES_VERSION) @logger.error "Elasticsearch version does not support data streams, Logstash might end up writing to an index", es_version: es_version.version # NOTE: when switching to synchronous check from register, this should be a ConfigurationError raise LogStash::Error, "A data_stream configuration is only supported since Elasticsearch #{DATA_STREAMS_ORIGIN_ES_VERSION} " + "(detected version #{es_version.version}), please upgrade your cluster" end es_version # return truthy end DATA_STREAMS_ENABLED_BY_DEFAULT_LS_VERSION = '8.0.0' # when data_stream => is either 'auto' or not set def data_stream_default(data_stream_params, valid_data_stream_config) ds_default = Gem::Version.create(LOGSTASH_VERSION) >= Gem::Version.create(DATA_STREAMS_ENABLED_BY_DEFAULT_LS_VERSION) if ds_default # LS 8.0 return false unless valid_data_stream_config @logger.debug 'Configuration is data stream compliant' return true end # LS 7.x if valid_data_stream_config && !data_stream_params.any? @logger.warn "Configuration is data stream compliant but due backwards compatibility Logstash 7.x will not assume " + "writing to a data-stream, default behavior will change on Logstash 8.0 " + "(set `data_stream => true/false` to disable this warning)" end false end # an {event_action_tuple} replacement when a data-stream configuration is detected def data_stream_event_action_tuple(event) event_data = event.to_hash data_stream_event_sync(event_data) if data_stream_sync_fields EventActionTuple.new('create', common_event_params(event), event, event_data) end DATA_STREAM_SYNC_FIELDS = [ 'type', 'dataset', 'namespace' ].freeze def data_stream_event_sync(event_data) data_stream = event_data['data_stream'] if data_stream.is_a?(Hash) unless data_stream_auto_routing sync_fields = DATA_STREAM_SYNC_FIELDS.select { |name| data_stream.key?(name) && data_stream[name] != send(:"data_stream_#{name}") } if sync_fields.any? # these fields will need to be overwritten info = sync_fields.inject({}) { |info, name| info[name] = data_stream[name]; info } info[:event] = event_data @logger.warn "Some data_stream fields are out of sync, these will be updated to reflect data-stream name", info # NOTE: we work directly with event.to_hash data thus fine to mutate the 'data_stream' hash sync_fields.each { |name| data_stream[name] = nil } # fallback to ||= bellow end end else unless data_stream.nil? @logger.warn "Invalid 'data_stream' field type, due fields sync will overwrite", value: data_stream, event: event_data end event_data['data_stream'] = data_stream = Hash.new end data_stream['type'] ||= data_stream_type data_stream['dataset'] ||= data_stream_dataset data_stream['namespace'] ||= data_stream_namespace event_data end module Validator # @override {LogStash::Config::Mixin::validate_value} to handle custom validators # @param value [Array<Object>] # @param validator [nil,Array,Symbol] # @return [Array(true,Object)]: if validation is a success, a tuple containing `true` and the coerced value # @return [Array(false,String)]: if validation is a failure, a tuple containing `false` and the failure reason. def validate_value(value, validator) case validator when :dataset_identifier then validate_dataset_identifier(value) when :namespace_identifier then validate_namespace_identifier(value) else super end end private def validate_dataset_identifier(value) valid, value = validate_value(value, :string) return false, value unless valid validate_identifier(value) end def validate_namespace_identifier(value) valid, value = validate_value(value, :string) return false, value unless valid validate_identifier(value) end def validate_identifier(value, max_size = 100) if value.empty? return false, "Invalid identifier - empty string" end if value.bytesize > max_size return false, "Invalid identifier - too long (#{value.bytesize} bytes)" end # cannot include \, /, *, ?, ", <, >, |, ' ' (space char), ',', #, : if value.match? Regexp.union(INVALID_IDENTIFIER_CHARS) return false, "Invalid characters detected #{INVALID_IDENTIFIER_CHARS.inspect} are not allowed" end return true, value end INVALID_IDENTIFIER_CHARS = [ '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':' ] private_constant :INVALID_IDENTIFIER_CHARS end end end end end