lib/logstash/inputs/exec.rb in logstash-input-exec-3.4.0 vs lib/logstash/inputs/exec.rb in logstash-input-exec-3.6.0
- old
+ new
@@ -1,13 +1,14 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
+require "open3"
require "socket" # for Socket.gethostname
require "stud/interval"
-require "rufus/scheduler"
require 'logstash/plugin_mixins/ecs_compatibility_support'
+require "logstash/plugin_mixins/scheduler"
# Periodically run a shell command and capture the whole output as an event.
#
# Notes:
#
@@ -15,10 +16,11 @@
# * The `message` field of this event will be the entire stdout of the command.
#
class LogStash::Inputs::Exec < LogStash::Inputs::Base
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
+ include LogStash::PluginMixins::Scheduler
config_name "exec"
default :codec, "plain"
@@ -34,12 +36,11 @@
# Either `interval` or `schedule` option must be defined.
config :schedule, :validate => :string
def register
@hostname = Socket.gethostname.freeze
- @io = nil
-
+
if (@interval.nil? && @schedule.nil?) || (@interval && @schedule)
raise LogStash::ConfigurationError, "exec input: either 'interval' or 'schedule' option must be defined."
end
@host_name_field = ecs_select[disabled: 'host', v1: '[host][name]']
@@ -51,26 +52,22 @@
@legacy_duration_field = ecs_select[disabled: '[@metadata][duration]', v1: nil] # in seconds
end # def register
def run(queue)
if @schedule
- @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
- @scheduler.cron @schedule do
- execute(queue)
- end
- @scheduler.join
+ scheduler.cron(@schedule) { execute(queue) }
+ scheduler.join
else
while !stop?
duration = execute(queue)
wait_until_end_of_interval(duration)
end # loop
end
end # def run
def stop
- close_io()
- @scheduler.shutdown(:wait) if @scheduler
+ close_out_and_in
end
# Execute a given command
# @param queue the LS queue to append events to
def execute(queue)
@@ -103,23 +100,29 @@
end
private
def run_command
- @io = IO.popen(@command)
- output = @io.read
- @io.close # required in order to read $?
- exit_status = $?.exitstatus
+ @p_in, @p_out, waiter = Open3.popen2(@command)
+ output = @p_out.read
+ exit_status = waiter.value.exitstatus
[output, exit_status]
ensure
- close_io()
+ close_out_and_in
end
- # Close @io
- def close_io
- return if @io.nil? || @io.closed?
- @io.close
- @io = nil
+ def close_out_and_in
+ close_io(@p_out)
+ @p_out = nil
+ close_io(@p_in)
+ @p_in = nil
+ end
+
+ def close_io(io)
+ return if io.nil? || io.closed?
+ io.close
+ rescue => e
+ @logger.debug("ignoring exception raised while closing io", :io => io, :exception => e.class, :message => e.message)
end
# Wait until the end of the interval
# @param duration [Integer] the duration of the last command executed
def wait_until_end_of_interval(duration)