lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.3.1 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-5.4.0
- old
+ new
@@ -2,24 +2,26 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/http_client"
require "socket" # for Socket.gethostname
require "manticore"
-require "rufus/scheduler"
require "logstash/plugin_mixins/ecs_compatibility_support"
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
+require 'logstash/plugin_mixins/scheduler'
class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
+ include LogStash::PluginMixins::Scheduler
+
config_name "http_poller"
default :codec, "json"
# A Hash of urls in this format : `"name" => "url"`.
@@ -43,43 +45,38 @@
# Set this value to the name of the field you'd like to store a nested
# hash of metadata.
config :metadata_target, :validate => :string, :default => '@metadata'
public
- Schedule_types = %w(cron every at in)
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
setup_ecs_field!
setup_requests!
end
# @overload
def stop
- shutdown_scheduler_and_close_client(:wait)
+ close_client
end
# @overload
def close
- shutdown_scheduler_and_close_client
+ close_client
end
- def shutdown_scheduler_and_close_client(opt = nil)
+ def close_client
@logger.debug("closing http client", client: client)
begin
client.close # since Manticore 0.9.0 this shuts-down/closes all resources
rescue => e
details = { exception: e.class, message: e.message }
details[:backtrace] = e.backtrace if @logger.debug?
@logger.warn "failed closing http client", details
end
- if @scheduler
- @logger.debug("shutting down scheduler", scheduler: @scheduler)
- @scheduler.shutdown(opt) # on newer Rufus (3.8) this joins on the scheduler thread
- end
end
- private :shutdown_scheduler_and_close_client
+ private :close_client
private
def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
end
@@ -183,15 +180,14 @@
msg_invalid_schedule = "Invalid config. schedule hash must contain " +
"exactly one of the following keys - cron, at, every or in"
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length != 1
schedule_type = @schedule.keys.first
schedule_value = @schedule[schedule_type]
- raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
+ raise LogStash::ConfigurationError, msg_invalid_schedule unless %w(cron every at in).include?(schedule_type)
- @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
- opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
- @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
- @scheduler.thread.join # due newer rufus (3.8) doing a blocking operation on scheduler.join
+ opts = schedule_type == "every" ? { first_in: 0.01 } : {}
+ scheduler.public_send(schedule_type, schedule_value, opts) { run_once(queue) }
+ scheduler.join
end
def run_once(queue)
@requests.each do |name, request|
# prevent executing a scheduler kick after the plugin has been stop-ed