lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-2.0.6 vs lib/logstash/inputs/http_poller.rb in logstash-input-http_poller-2.1.0
- old
+ new
@@ -2,10 +2,11 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/http_client"
require "socket" # for Socket.gethostname
require "manticore"
+require "rufus/scheduler"
# This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and
# send them on their merry way. The idea behind this plugins came from a need to read springboot
# metrics endpoint, instead of configuring jmx to monitor my java application memory/gc/ etc.
#
@@ -31,11 +32,12 @@
# password => "hunter2"
# }
# }
# }
# request_timeout => 60
-# interval => 60
+# # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
+# schedule => { cron => "* * * * * UTC"}
# codec => "json"
# # A hash of request metadata info (timing, response headers, etc.) will be sent here
# metadata_target => "http_poller_metadata"
# }
# }
@@ -57,32 +59,47 @@
# A Hash of urls in this format : `"name" => "url"`.
# The name and the url will be passed in the outputed event
config :urls, :validate => :hash, :required => true
# How often (in seconds) the urls will be called
- config :interval, :validate => :number, :required => true
+ # DEPRECATED. Use 'schedule' option instead.
+ # If both interval and schedule options are specified, interval
+ # option takes higher precedence
+ config :interval, :validate => :number, :deprecated => true
+ # Schedule of when to periodically poll from the urls
+ # Format: A hash with
+ # + key: "cron" | "every" | "in" | "at"
+ # + value: string
+ # Examples:
+ # a) { "every" => "1h" }
+ # b) { "cron" => "* * * * * UTC" }
+ # See: rufus/scheduler for details about different schedule options and value string format
+ config :schedule, :validate => :hash
+
# Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event.
config :target, :validate => :string
# If you'd like to work with the request/response metadata.
# Set this value to the name of the field you'd like to store a nested
# hash of metadata.
config :metadata_target, :validate => :string, :default => '@metadata'
public
+ Schedule_types = %w(cron every at in)
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
@logger.info("Registering http_poller Input", :type => @type,
- :urls => @urls, :interval => @interval, :timeout => @timeout)
+ :urls => @urls, :interval => @interval, :schedule => @schedule, :timeout => @timeout)
setup_requests!
end
def stop
Stud.stop!(@interval_thread) if @interval_thread
+ @scheduler.stop if @scheduler
end
private
def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
@@ -131,16 +148,49 @@
request
end
public
def run(queue)
+ #interval or schedule must be provided. Must be exclusively either one. Not neither. Not both.
+ raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." \
+ unless @interval || @schedule
+ raise LogStash::ConfigurationError, "Invalid config. Specify only interval or schedule. Not both." \
+ if @interval && @schedule
+
+ if @interval
+ setup_interval(queue)
+ elsif @schedule
+ setup_schedule(queue)
+ else
+ #should not reach here
+ raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified."
+ end
+ end
+
+ private
+ def setup_interval(queue)
@interval_thread = Thread.current
Stud.interval(@interval) do
run_once(queue)
end
end
- private
+ def setup_schedule(queue)
+ #schedule hash must contain exactly one of the allowed keys
+ msg_invalid_schedule = "Invalid config. schedule hash must contain " +
+ "exactly one of the following keys - cron, at, every or in"
+ raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
+ schedule_type = @schedule.keys.first
+ schedule_value = @schedule[schedule_type]
+ raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
+
+ @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
+ #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
+ opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
+ @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
+ @scheduler.join
+ end
+
def run_once(queue)
@requests.each do |name, request|
request_async(queue, name, request)
end