lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-3.0.3 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-3.1.0
- old
+ new
@@ -2,10 +2,11 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/http_client"
require "socket" # for Socket.gethostname
require "manticore"
+require "rufus/scheduler"
# This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and
# send them on their merry way. The idea behind this plugins came from a need to read springboot
# metrics endpoint, instead of configuring jmx to monitor my java application memory/gc/ etc.
#
@@ -31,11 +32,12 @@
# password => "hunter2"
# }
# }
# }
# request_timeout => 60
-# interval => 60
+# # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
+# schedule => { cron => "* * * * * UTC"}
# codec => "json"
# # A hash of request metadata info (timing, response headers, etc.) will be sent here
# metadata_target => "http_poller_metadata"
# }
# }
@@ -49,13 +51,14 @@
#
# Using the HTTP poller with custom a custom CA or self signed cert.
#
# If you have a self signed cert you will need to convert your server's certificate to a valid# `.jks` or `.p12` file. An easy way to do it is to run the following one-liner, substituting your server's URL for the placeholder `MYURL` and `MYPORT`.
#
-#....
+# [source,ruby]
+# ----------------------------------
# openssl s_client -showcerts -connect MYURL:MYPORT </dev/null 2>/dev/null|openssl x509 -outform PEM > downloaded_cert.pem; keytool -import -alias test -file downloaded_cert.pem -keystore downloaded_truststore.jks
-#....
+# ----------------------------------
#
# The above snippet will create two files `downloaded_cert.pem` and `downloaded_truststore.jks`. You will be prompted to set a password for the `jks` file during this process. To configure logstash use a config like the one that follows.
#
#
# [source,ruby]
@@ -68,13 +71,13 @@
# truststore => "/path/to/downloaded_truststore.jks"
# truststore_password => "mypassword"
# interval => 30
# }
#}
+# ----------------------------------
#
-
class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
config_name "http_poller"
@@ -83,32 +86,47 @@
# A Hash of urls in this format : `"name" => "url"`.
# The name and the url will be passed in the outputed event
config :urls, :validate => :hash, :required => true
# How often (in seconds) the urls will be called
- config :interval, :validate => :number, :required => true
+ # DEPRECATED. Use 'schedule' option instead.
+ # If both interval and schedule options are specified, interval
+ # option takes higher precedence
+ config :interval, :validate => :number, :deprecated => true
+ # Schedule of when to periodically poll from the urls
+ # Format: A hash with
+ # + key: "cron" | "every" | "in" | "at"
+ # + value: string
+ # Examples:
+ # a) { "every" => "1h" }
+ # b) { "cron" => "* * * * * UTC" }
+ # See: rufus/scheduler for details about different schedule options and value string format
+ config :schedule, :validate => :hash
+
# Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event.
config :target, :validate => :string
# If you'd like to work with the request/response metadata.
# Set this value to the name of the field you'd like to store a nested
# hash of metadata.
config :metadata_target, :validate => :string, :default => '@metadata'
public
+ Schedule_types = %w(cron every at in)
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
@logger.info("Registering http_poller Input", :type => @type,
- :urls => @urls, :interval => @interval, :timeout => @timeout)
+ :urls => @urls, :interval => @interval, :schedule => @schedule, :timeout => @timeout)
setup_requests!
end
def stop
Stud.stop!(@interval_thread) if @interval_thread
+ @scheduler.stop if @scheduler
end
private
def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
@@ -157,16 +175,49 @@
request
end
public
def run(queue)
+ #interval or schedule must be provided. Must be exclusively either one. Not neither. Not both.
+ raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." \
+ unless @interval || @schedule
+ raise LogStash::ConfigurationError, "Invalid config. Specify only interval or schedule. Not both." \
+ if @interval && @schedule
+
+ if @interval
+ setup_interval(queue)
+ elsif @schedule
+ setup_schedule(queue)
+ else
+ #should not reach here
+ raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified."
+ end
+ end
+
+ private
+ def setup_interval(queue)
@interval_thread = Thread.current
Stud.interval(@interval) do
run_once(queue)
end
end
- private
+ def setup_schedule(queue)
+ #schedule hash must contain exactly one of the allowed keys
+ msg_invalid_schedule = "Invalid config. schedule hash must contain " +
+ "exactly one of the following keys - cron, at, every or in"
+ raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
+ schedule_type = @schedule.keys.first
+ schedule_value = @schedule[schedule_type]
+ raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
+
+ @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
+ #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
+ opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
+ @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
+ @scheduler.join
+ end
+
def run_once(queue)
@requests.each do |name, request|
request_async(queue, name, request)
end