lib/logstash/inputs/exec.rb in logstash-input-exec-3.1.5 vs lib/logstash/inputs/exec.rb in logstash-input-exec-3.2.0
- old
+ new
@@ -1,10 +1,11 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "socket" # for Socket.gethostname
require "stud/interval"
+require "rufus/scheduler"
# Periodically run a shell command and capture the whole output as an event.
#
# Notes:
#
@@ -15,46 +16,71 @@
config_name "exec"
default :codec, "plain"
- # Command to run. For example, `uptime`
+ # Command to run. For example : `uptime`
config :command, :validate => :string, :required => true
# Interval to run the command. Value is in seconds.
- config :interval, :validate => :number, :required => true
+ # Either `interval` or `schedule` option must be defined.
+ config :interval, :validate => :number
+ # Schedule of when to periodically run command, in Cron format
+ # For example: "* * * * *" (execute command every minute, on the minute)
+ # Either `interval` or `schedule` option must be defined.
+ config :schedule, :validate => :string
+
def register
- @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval)
+ @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval, :schedule => @schedule)
@hostname = Socket.gethostname
@io = nil
+
+ if (@interval.nil? && @schedule.nil?) || (@interval && @schedule)
+ raise LogStash::ConfigurationError, "jdbc input: either 'interval' or 'schedule' option must be defined."
+ end
end # def register
def run(queue)
- while !stop?
- inner_run(queue)
- end # loop
+ if @schedule
+ @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
+ @scheduler.cron @schedule do
+ inner_run(queue)
+ end
+ @scheduler.join
+ else
+ while !stop?
+ duration = inner_run(queue)
+ wait_until_end_of_interval(duration)
+ end # loop
+ end
end # def run
def inner_run(queue)
start = Time.now
execute(@command, queue)
duration = Time.now - start
@logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration)
- wait_until_end_of_interval(duration)
+ return duration
end
def stop
+ close_io()
+ @scheduler.shutdown(:wait) if @scheduler
+ end
+
+ private
+
+ # Close @io
+ def close_io
return if @io.nil? || @io.closed?
@io.close
@io = nil
end
- private
-
# Wait until the end of the interval
# @param [Integer] the duration of the last command executed
def wait_until_end_of_interval(duration)
# Sleep for the remainder of the interval, or 0 if the duration ran
# longer than the interval.
@@ -85,9 +111,9 @@
:command => command, :e => e, :backtrace => e.backtrace)
rescue Exception => e
@logger.error("Exception while running command",
:command => command, :e => e, :backtrace => e.backtrace)
ensure
- stop
+ close_io()
end
end
end # class LogStash::Inputs::Exec