lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.13.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.14.0
- old
+ new
@@ -6,10 +6,11 @@
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
+require "logstash/plugin_mixins/scheduler"
require "base64"
require "elasticsearch"
require "elasticsearch/transport/transport/http/manticore"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
@@ -76,10 +77,12 @@
include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
+ include LogStash::PluginMixins::Scheduler
+
config_name "elasticsearch"
# List of elasticsearch hosts to use for querying.
# Each host can be either IP, HOST, IP:port or HOST:port.
# Port defaults to 9200
@@ -249,35 +252,28 @@
end
def run(output_queue)
if @schedule
- @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
- @scheduler.cron @schedule do
- do_run(output_queue)
- end
-
- @scheduler.join
+ scheduler.cron(@schedule) { do_run(output_queue) }
+ scheduler.join
else
do_run(output_queue)
end
end
- def stop
- @scheduler.stop if @scheduler
- end
-
private
def do_run(output_queue)
# if configured to run a single slice, don't bother spinning up threads
return do_run_slice(output_queue) if @slices.nil? || @slices <= 1
logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8
+ pipeline_id = execution_context&.pipeline_id || 'main'
@slices.times.map do |slice_id|
Thread.new do
- LogStash::Util::set_thread_name("#{@id}_slice_#{slice_id}")
+ LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
do_run_slice(output_queue, slice_id)
end
end.map(&:join)
end