lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.19.1 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.20.0
- old
+ new
@@ -72,10 +72,11 @@
#
#
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
require 'logstash/inputs/elasticsearch/paginated_search'
+ require 'logstash/inputs/elasticsearch/aggregation'
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
@@ -99,10 +100,15 @@
# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
+ # This allows you to speccify the response type: either hits or aggregations
+ # where hits: normal search request
+ # aggregations: aggregation request
+ config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
+
# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
# The number of retries to run the query. If the query fails after all retries, it logs an error message.
config :retries, :validate => :number, :default => 0
@@ -280,15 +286,10 @@
@pipeline_id = execution_context&.pipeline_id || 'main'
fill_hosts_from_cloud_id
setup_ssl_params!
- @options = {
- :index => @index,
- :scroll => @scroll,
- :size => @size
- }
@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
@@ -326,25 +327,29 @@
setup_serverless
setup_search_api
+ setup_query_executor
+
@client
end
-
def run(output_queue)
if @schedule
- scheduler.cron(@schedule) { @paginated_search.do_run(output_queue) }
+ scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
scheduler.join
else
- @paginated_search.do_run(output_queue)
+ @query_executor.do_run(output_queue)
end
end
- def push_hit(hit, output_queue)
- event = targeted_event_factory.new_event hit['_source']
+ ##
+ # This can be called externally from the query_executor
+ public
+ def push_hit(hit, output_queue, root_field = '_source')
+ event = targeted_event_factory.new_event hit[root_field]
set_docinfo_fields(hit, event) if @docinfo
decorate(event)
output_queue << event
end
@@ -641,16 +646,23 @@
api
else
@search_api
end
+ end
- @paginated_search = if @resolved_search_api == "search_after"
+ def setup_query_executor
+ @query_executor = case @response_type
+ when 'hits'
+ if @resolved_search_api == "search_after"
LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self)
else
logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
end
+ when 'aggregations'
+ LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
+ end
end
module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator