lib/logstash/inputs/exec.rb in logstash-input-exec-3.1.5 vs lib/logstash/inputs/exec.rb in logstash-input-exec-3.2.0

- old
+ new

@@ -1,10 +1,11 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "socket" # for Socket.gethostname require "stud/interval" +require "rufus/scheduler" # Periodically run a shell command and capture the whole output as an event. # # Notes: # @@ -15,46 +16,71 @@ config_name "exec" default :codec, "plain" - # Command to run. For example, `uptime` + # Command to run. For example : `uptime` config :command, :validate => :string, :required => true # Interval to run the command. Value is in seconds. - config :interval, :validate => :number, :required => true + # Either `interval` or `schedule` option must be defined. + config :interval, :validate => :number + # Schedule of when to periodically run command, in Cron format + # For example: "* * * * *" (execute command every minute, on the minute) + # Either `interval` or `schedule` option must be defined. + config :schedule, :validate => :string + def register - @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval) + @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval, :schedule => @schedule) @hostname = Socket.gethostname @io = nil + + if (@interval.nil? && @schedule.nil?) || (@interval && @schedule) + raise LogStash::ConfigurationError, "jdbc input: either 'interval' or 'schedule' option must be defined." + end end # def register def run(queue) - while !stop? - inner_run(queue) - end # loop + if @schedule + @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) + @scheduler.cron @schedule do + inner_run(queue) + end + @scheduler.join + else + while !stop? + duration = inner_run(queue) + wait_until_end_of_interval(duration) + end # loop + end end # def run def inner_run(queue) start = Time.now execute(@command, queue) duration = Time.now - start @logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration) - wait_until_end_of_interval(duration) + return duration end def stop + close_io() + @scheduler.shutdown(:wait) if @scheduler + end + + private + + # Close @io + def close_io return if @io.nil? || @io.closed? @io.close @io = nil end - private - # Wait until the end of the interval # @param [Integer] the duration of the last command executed def wait_until_end_of_interval(duration) # Sleep for the remainder of the interval, or 0 if the duration ran # longer than the interval. @@ -85,9 +111,9 @@ :command => command, :e => e, :backtrace => e.backtrace) rescue Exception => e @logger.error("Exception while running command", :command => command, :e => e, :backtrace => e.backtrace) ensure - stop + close_io() end end end # class LogStash::Inputs::Exec