lib/logstash/inputs/exec.rb in logstash-input-exec-3.4.0 vs lib/logstash/inputs/exec.rb in logstash-input-exec-3.6.0

- old
+ new

@@ -1,13 +1,14 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" +require "open3" require "socket" # for Socket.gethostname require "stud/interval" -require "rufus/scheduler" require 'logstash/plugin_mixins/ecs_compatibility_support' +require "logstash/plugin_mixins/scheduler" # Periodically run a shell command and capture the whole output as an event. # # Notes: # @@ -15,10 +16,11 @@ # * The `message` field of this event will be the entire stdout of the command. # class LogStash::Inputs::Exec < LogStash::Inputs::Base include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) + include LogStash::PluginMixins::Scheduler config_name "exec" default :codec, "plain" @@ -34,12 +36,11 @@ # Either `interval` or `schedule` option must be defined. config :schedule, :validate => :string def register @hostname = Socket.gethostname.freeze - @io = nil - + if (@interval.nil? && @schedule.nil?) || (@interval && @schedule) raise LogStash::ConfigurationError, "exec input: either 'interval' or 'schedule' option must be defined." end @host_name_field = ecs_select[disabled: 'host', v1: '[host][name]'] @@ -51,26 +52,22 @@ @legacy_duration_field = ecs_select[disabled: '[@metadata][duration]', v1: nil] # in seconds end # def register def run(queue) if @schedule - @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) - @scheduler.cron @schedule do - execute(queue) - end - @scheduler.join + scheduler.cron(@schedule) { execute(queue) } + scheduler.join else while !stop? duration = execute(queue) wait_until_end_of_interval(duration) end # loop end end # def run def stop - close_io() - @scheduler.shutdown(:wait) if @scheduler + close_out_and_in end # Execute a given command # @param queue the LS queue to append events to def execute(queue) @@ -103,23 +100,29 @@ end private def run_command - @io = IO.popen(@command) - output = @io.read - @io.close # required in order to read $? - exit_status = $?.exitstatus + @p_in, @p_out, waiter = Open3.popen2(@command) + output = @p_out.read + exit_status = waiter.value.exitstatus [output, exit_status] ensure - close_io() + close_out_and_in end - # Close @io - def close_io - return if @io.nil? || @io.closed? - @io.close - @io = nil + def close_out_and_in + close_io(@p_out) + @p_out = nil + close_io(@p_in) + @p_in = nil + end + + def close_io(io) + return if io.nil? || io.closed? + io.close + rescue => e + @logger.debug("ignoring exception raised while closing io", :io => io, :exception => e.class, :message => e.message) end # Wait until the end of the interval # @param duration [Integer] the duration of the last command executed def wait_until_end_of_interval(duration)