lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-3.0.3 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-3.1.0

- old
+ new

@@ -2,10 +2,11 @@ require "logstash/inputs/base" require "logstash/namespace" require "logstash/plugin_mixins/http_client" require "socket" # for Socket.gethostname require "manticore" +require "rufus/scheduler" # This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and # send them on their merry way. The idea behind this plugins came from a need to read springboot # metrics endpoint, instead of configuring jmx to monitor my java application memory/gc/ etc. # @@ -31,11 +32,12 @@ # password => "hunter2" # } # } # } # request_timeout => 60 -# interval => 60 +# # Supports "cron", "every", "at" and "in" schedules by rufus scheduler +# schedule => { cron => "* * * * * UTC"} # codec => "json" # # A hash of request metadata info (timing, response headers, etc.) will be sent here # metadata_target => "http_poller_metadata" # } # } @@ -49,13 +51,14 @@ # # Using the HTTP poller with custom a custom CA or self signed cert. # # If you have a self signed cert you will need to convert your server's certificate to a valid# `.jks` or `.p12` file. An easy way to do it is to run the following one-liner, substituting your server's URL for the placeholder `MYURL` and `MYPORT`. # -#.... +# [source,ruby] +# ---------------------------------- # openssl s_client -showcerts -connect MYURL:MYPORT </dev/null 2>/dev/null|openssl x509 -outform PEM > downloaded_cert.pem; keytool -import -alias test -file downloaded_cert.pem -keystore downloaded_truststore.jks -#.... +# ---------------------------------- # # The above snippet will create two files `downloaded_cert.pem` and `downloaded_truststore.jks`. You will be prompted to set a password for the `jks` file during this process. To configure logstash use a config like the one that follows. # # # [source,ruby] @@ -68,13 +71,13 @@ # truststore => "/path/to/downloaded_truststore.jks" # truststore_password => "mypassword" # interval => 30 # } #} +# ---------------------------------- # - class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base include LogStash::PluginMixins::HttpClient config_name "http_poller" @@ -83,32 +86,47 @@ # A Hash of urls in this format : `"name" => "url"`. # The name and the url will be passed in the outputed event config :urls, :validate => :hash, :required => true # How often (in seconds) the urls will be called - config :interval, :validate => :number, :required => true + # DEPRECATED. Use 'schedule' option instead. + # If both interval and schedule options are specified, interval + # option takes higher precedence + config :interval, :validate => :number, :deprecated => true + # Schedule of when to periodically poll from the urls + # Format: A hash with + # + key: "cron" | "every" | "in" | "at" + # + value: string + # Examples: + # a) { "every" => "1h" } + # b) { "cron" => "* * * * * UTC" } + # See: rufus/scheduler for details about different schedule options and value string format + config :schedule, :validate => :hash + # Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event. config :target, :validate => :string # If you'd like to work with the request/response metadata. # Set this value to the name of the field you'd like to store a nested # hash of metadata. config :metadata_target, :validate => :string, :default => '@metadata' public + Schedule_types = %w(cron every at in) def register @host = Socket.gethostname.force_encoding(Encoding::UTF_8) @logger.info("Registering http_poller Input", :type => @type, - :urls => @urls, :interval => @interval, :timeout => @timeout) + :urls => @urls, :interval => @interval, :schedule => @schedule, :timeout => @timeout) setup_requests! end def stop Stud.stop!(@interval_thread) if @interval_thread + @scheduler.stop if @scheduler end private def setup_requests! @requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }] @@ -157,16 +175,49 @@ request end public def run(queue) + #interval or schedule must be provided. Must be exclusively either one. Not neither. Not both. + raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." \ + unless @interval || @schedule + raise LogStash::ConfigurationError, "Invalid config. Specify only interval or schedule. Not both." \ + if @interval && @schedule + + if @interval + setup_interval(queue) + elsif @schedule + setup_schedule(queue) + else + #should not reach here + raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." + end + end + + private + def setup_interval(queue) @interval_thread = Thread.current Stud.interval(@interval) do run_once(queue) end end - private + def setup_schedule(queue) + #schedule hash must contain exactly one of the allowed keys + msg_invalid_schedule = "Invalid config. schedule hash must contain " + + "exactly one of the following keys - cron, at, every or in" + raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1 + schedule_type = @schedule.keys.first + schedule_value = @schedule[schedule_type] + raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type) + + @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) + #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead + opts = schedule_type == "every" ? { :first_in => 0.01 } : {} + @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) } + @scheduler.join + end + def run_once(queue) @requests.each do |name, request| request_async(queue, name, request) end