lib/logstash/inputs/exec.rb in logstash-input-exec-3.3.3 vs lib/logstash/inputs/exec.rb in logstash-input-exec-3.4.0
- old
+ new
@@ -3,19 +3,23 @@
require "logstash/namespace"
require "socket" # for Socket.gethostname
require "stud/interval"
require "rufus/scheduler"
+require 'logstash/plugin_mixins/ecs_compatibility_support'
+
# Periodically run a shell command and capture the whole output as an event.
#
# Notes:
#
# * The `command` field of this event will be the command run.
# * The `message` field of this event will be the entire stdout of the command.
#
class LogStash::Inputs::Exec < LogStash::Inputs::Base
+ include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
+
config_name "exec"
default :codec, "plain"
# Command to run. For example : `uptime`
@@ -29,17 +33,24 @@
# For example: "* * * * *" (execute command every minute, on the minute)
# Either `interval` or `schedule` option must be defined.
config :schedule, :validate => :string
def register
- @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval, :schedule => @schedule)
- @hostname = Socket.gethostname
+ @hostname = Socket.gethostname.freeze
@io = nil
if (@interval.nil? && @schedule.nil?) || (@interval && @schedule)
raise LogStash::ConfigurationError, "exec input: either 'interval' or 'schedule' option must be defined."
end
+
+ @host_name_field = ecs_select[disabled: 'host', v1: '[host][name]']
+ @process_command_line_field = ecs_select[disabled: 'command', v1: '[process][command_line]']
+ @process_exit_code_field = ecs_select[disabled: '[@metadata][exit_status]', v1: '[process][exit_code]']
+
+ # migrate elapsed time tracking to whole nanos, from legacy floating-point fractional seconds
+ @process_elapsed_time_field = ecs_select[disabled: nil, v1: '[@metadata][input][exec][process][elapsed_time]'] # in nanos
+ @legacy_duration_field = ecs_select[disabled: '[@metadata][duration]', v1: nil] # in seconds
end # def register
def run(queue)
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@@ -59,34 +70,34 @@
close_io()
@scheduler.shutdown(:wait) if @scheduler
end
# Execute a given command
- # @param [String] A command string
- # @param [Array or Queue] A queue to append events to
+ # @param queue the LS queue to append events to
def execute(queue)
start = Time.now
output = exit_status = nil
begin
@logger.debug? && @logger.debug("Running exec", :command => @command)
output, exit_status = run_command()
rescue StandardError => e
@logger.error("Error while running command",
- :command => @command, :e => e, :backtrace => e.backtrace)
+ :command => @command, :exception => e, :backtrace => e.backtrace)
rescue Exception => e
@logger.error("Exception while running command",
- :command => @command, :e => e, :backtrace => e.backtrace)
+ :command => @command, :exception => e, :backtrace => e.backtrace)
end
- duration = Time.now - start
- @logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration)
+ duration = Time.now.to_r - start.to_r
+ @logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration.to_f)
if output
@codec.decode(output) do |event|
decorate(event)
- event.set("host", @hostname)
- event.set("command", @command)
- event.set("[@metadata][duration]", duration)
- event.set("[@metadata][exit_status]", exit_status)
+ event.set(@host_name_field, @hostname) unless event.include?(@host_name_field)
+ event.set(@process_command_line_field, @command) unless event.include?(@process_command_line_field)
+ event.set(@process_exit_code_field, exit_status) unless event.include?(@process_exit_code_field)
+ event.set(@process_elapsed_time_field, to_nanos(duration)) if @process_elapsed_time_field
+ event.set(@legacy_duration_field, duration.to_f) if @legacy_duration_field
queue << event
end
end
duration
end
@@ -95,11 +106,11 @@
def run_command
@io = IO.popen(@command)
output = @io.read
@io.close # required in order to read $?
- exit_status = $?.exitstatus # should be threadsafe as per rb_thread_save_context
+ exit_status = $?.exitstatus
[output, exit_status]
ensure
close_io()
end
@@ -109,11 +120,11 @@
@io.close
@io = nil
end
# Wait until the end of the interval
- # @param [Integer] the duration of the last command executed
+ # @param duration [Integer] the duration of the last command executed
def wait_until_end_of_interval(duration)
# Sleep for the remainder of the interval, or 0 if the duration ran
# longer than the interval.
sleeptime = [0, @interval - duration].max
if sleeptime > 0
@@ -122,7 +133,12 @@
@logger.warn("Execution ran longer than the interval. Skipping sleep.",
:command => @command, :duration => duration, :interval => @interval)
end
end
+ # convert seconds to nanoseconds
+ # @param time_diff [Numeric] the (rational value) difference to convert
+ def to_nanos(time_diff)
+ (time_diff * 1_000_000).to_i
+ end
end # class LogStash::Inputs::Exec