lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.19.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.20.0

- old
+ new

@@ -72,10 +72,11 @@ # # class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' + require 'logstash/inputs/elasticsearch/aggregation' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck include LogStash::PluginMixins::EventSupport::EventFactoryAdapter @@ -99,10 +100,15 @@ # The query to be executed. Read the Elasticsearch query DSL documentation # for more info # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }' + # This allows you to speccify the response type: either hits or aggregations + # where hits: normal search request + # aggregations: aggregation request + config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits' + # This allows you to set the maximum number of hits returned per scroll. config :size, :validate => :number, :default => 1000 # The number of retries to run the query. If the query fails after all retries, it logs an error message. config :retries, :validate => :number, :default => 0 @@ -280,15 +286,10 @@ @pipeline_id = execution_context&.pipeline_id || 'main' fill_hosts_from_cloud_id setup_ssl_params! - @options = { - :index => @index, - :scroll => @scroll, - :size => @size - } @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end @@ -326,25 +327,29 @@ setup_serverless setup_search_api + setup_query_executor + @client end - def run(output_queue) if @schedule - scheduler.cron(@schedule) { @paginated_search.do_run(output_queue) } + scheduler.cron(@schedule) { @query_executor.do_run(output_queue) } scheduler.join else - @paginated_search.do_run(output_queue) + @query_executor.do_run(output_queue) end end - def push_hit(hit, output_queue) - event = targeted_event_factory.new_event hit['_source'] + ## + # This can be called externally from the query_executor + public + def push_hit(hit, output_queue, root_field = '_source') + event = targeted_event_factory.new_event hit[root_field] set_docinfo_fields(hit, event) if @docinfo decorate(event) output_queue << event end @@ -641,16 +646,23 @@ api else @search_api end + end - @paginated_search = if @resolved_search_api == "search_after" + def setup_query_executor + @query_executor = case @response_type + when 'hits' + if @resolved_search_api == "search_after" LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) else logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8 LogStash::Inputs::Elasticsearch::Scroll.new(@client, self) end + when 'aggregations' + LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) + end end module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator