lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.13.0 vs lib/logstash/inputs/elasticsearch.rb in logstash-input-elasticsearch-4.14.0

- old
+ new

@@ -6,10 +6,11 @@ require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' require 'logstash/plugin_mixins/event_support/event_factory_adapter' require 'logstash/plugin_mixins/ecs_compatibility_support' require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' require 'logstash/plugin_mixins/ca_trusted_fingerprint_support' +require "logstash/plugin_mixins/scheduler" require "base64" require "elasticsearch" require "elasticsearch/transport/transport/http/manticore" require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore" @@ -76,10 +77,12 @@ include LogStash::PluginMixins::EventSupport::EventFactoryAdapter extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter + include LogStash::PluginMixins::Scheduler + config_name "elasticsearch" # List of elasticsearch hosts to use for querying. # Each host can be either IP, HOST, IP:port or HOST:port. # Port defaults to 9200 @@ -249,35 +252,28 @@ end def run(output_queue) if @schedule - @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) - @scheduler.cron @schedule do - do_run(output_queue) - end - - @scheduler.join + scheduler.cron(@schedule) { do_run(output_queue) } + scheduler.join else do_run(output_queue) end end - def stop - @scheduler.stop if @scheduler - end - private def do_run(output_queue) # if configured to run a single slice, don't bother spinning up threads return do_run_slice(output_queue) if @slices.nil? || @slices <= 1 logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8 + pipeline_id = execution_context&.pipeline_id || 'main' @slices.times.map do |slice_id| Thread.new do - LogStash::Util::set_thread_name("#{@id}_slice_#{slice_id}") + LogStash::Util::set_thread_name("[#{pipeline_id}]|input|elasticsearch|slice_#{slice_id}") do_run_slice(output_queue, slice_id) end end.map(&:join) end