lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.3.1 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.4.0

- old
+ new

@@ -2,24 +2,26 @@ require "logstash/inputs/base" require "logstash/namespace" require "logstash/plugin_mixins/http_client" require "socket" # for Socket.gethostname require "manticore" -require "rufus/scheduler" require "logstash/plugin_mixins/ecs_compatibility_support" require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' require 'logstash/plugin_mixins/event_support/event_factory_adapter' +require 'logstash/plugin_mixins/scheduler' class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base include LogStash::PluginMixins::HttpClient include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck include LogStash::PluginMixins::EventSupport::EventFactoryAdapter extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter + include LogStash::PluginMixins::Scheduler + config_name "http_poller" default :codec, "json" # A Hash of urls in this format : `"name" => "url"`. @@ -43,43 +45,38 @@ # Set this value to the name of the field you'd like to store a nested # hash of metadata. config :metadata_target, :validate => :string, :default => '@metadata' public - Schedule_types = %w(cron every at in) def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) setup_ecs_field! setup_requests! end # @overload def stop - shutdown_scheduler_and_close_client(:wait) + close_client end # @overload def close - shutdown_scheduler_and_close_client + close_client end - def shutdown_scheduler_and_close_client(opt = nil) + def close_client @logger.debug("closing http client", client: client) begin client.close # since Manticore 0.9.0 this shuts-down/closes all resources rescue => e details = { exception: e.class, message: e.message } details[:backtrace] = e.backtrace if @logger.debug? @logger.warn "failed closing http client", details end - if @scheduler - @logger.debug("shutting down scheduler", scheduler: @scheduler) - @scheduler.shutdown(opt) # on newer Rufus (3.8) this joins on the scheduler thread - end end - private :shutdown_scheduler_and_close_client + private :close_client private def setup_requests! @requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }] end @@ -183,15 +180,14 @@ msg_invalid_schedule = "Invalid config. schedule hash must contain " + "exactly one of the following keys - cron, at, every or in" raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1 schedule_type = @schedule.keys.first schedule_value = @schedule[schedule_type] - raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type) + raise LogStash::ConfigurationError, msg_invalid_schedule unless %w(cron every at in).include?(schedule_type) - @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) - opts = schedule_type == "every" ? { :first_in => 0.01 } : {} - @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) } - @scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join + opts = schedule_type == "every" ? { first_in: 0.01 } : {} + scheduler.public_send(schedule_type, schedule_value, opts) { run_once(queue) } + scheduler.join end def run_once(queue) @requests.each do |name, request| # prevent executing a scheduler kick after the plugin has been stop-ed