lib/logstash/inputs/exec.rb in logstash-input-exec-3.3.3 vs lib/logstash/inputs/exec.rb in logstash-input-exec-3.4.0

- old
+ new

@@ -3,19 +3,23 @@ require "logstash/namespace" require "socket" # for Socket.gethostname require "stud/interval" require "rufus/scheduler" +require 'logstash/plugin_mixins/ecs_compatibility_support' + # Periodically run a shell command and capture the whole output as an event. # # Notes: # # * The `command` field of this event will be the command run. # * The `message` field of this event will be the entire stdout of the command. # class LogStash::Inputs::Exec < LogStash::Inputs::Base + include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) + config_name "exec" default :codec, "plain" # Command to run. For example : `uptime` @@ -29,17 +33,24 @@ # For example: "* * * * *" (execute command every minute, on the minute) # Either `interval` or `schedule` option must be defined. config :schedule, :validate => :string def register - @logger.info("Registering Exec Input", :type => @type, :command => @command, :interval => @interval, :schedule => @schedule) - @hostname = Socket.gethostname + @hostname = Socket.gethostname.freeze @io = nil if (@interval.nil? && @schedule.nil?) || (@interval && @schedule) raise LogStash::ConfigurationError, "exec input: either 'interval' or 'schedule' option must be defined." end + + @host_name_field = ecs_select[disabled: 'host', v1: '[host][name]'] + @process_command_line_field = ecs_select[disabled: 'command', v1: '[process][command_line]'] + @process_exit_code_field = ecs_select[disabled: '[@metadata][exit_status]', v1: '[process][exit_code]'] + + # migrate elapsed time tracking to whole nanos, from legacy floating-point fractional seconds + @process_elapsed_time_field = ecs_select[disabled: nil, v1: '[@metadata][input][exec][process][elapsed_time]'] # in nanos + @legacy_duration_field = ecs_select[disabled: '[@metadata][duration]', v1: nil] # in seconds end # def register def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @@ -59,34 +70,34 @@ close_io() @scheduler.shutdown(:wait) if @scheduler end # Execute a given command - # @param [String] A command string - # @param [Array or Queue] A queue to append events to + # @param queue the LS queue to append events to def execute(queue) start = Time.now output = exit_status = nil begin @logger.debug? && @logger.debug("Running exec", :command => @command) output, exit_status = run_command() rescue StandardError => e @logger.error("Error while running command", - :command => @command, :e => e, :backtrace => e.backtrace) + :command => @command, :exception => e, :backtrace => e.backtrace) rescue Exception => e @logger.error("Exception while running command", - :command => @command, :e => e, :backtrace => e.backtrace) + :command => @command, :exception => e, :backtrace => e.backtrace) end - duration = Time.now - start - @logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration) + duration = Time.now.to_r - start.to_r + @logger.debug? && @logger.debug("Command completed", :command => @command, :duration => duration.to_f) if output @codec.decode(output) do |event| decorate(event) - event.set("host", @hostname) - event.set("command", @command) - event.set("[@metadata][duration]", duration) - event.set("[@metadata][exit_status]", exit_status) + event.set(@host_name_field, @hostname) unless event.include?(@host_name_field) + event.set(@process_command_line_field, @command) unless event.include?(@process_command_line_field) + event.set(@process_exit_code_field, exit_status) unless event.include?(@process_exit_code_field) + event.set(@process_elapsed_time_field, to_nanos(duration)) if @process_elapsed_time_field + event.set(@legacy_duration_field, duration.to_f) if @legacy_duration_field queue << event end end duration end @@ -95,11 +106,11 @@ def run_command @io = IO.popen(@command) output = @io.read @io.close # required in order to read $? - exit_status = $?.exitstatus # should be threadsafe as per rb_thread_save_context + exit_status = $?.exitstatus [output, exit_status] ensure close_io() end @@ -109,11 +120,11 @@ @io.close @io = nil end # Wait until the end of the interval - # @param [Integer] the duration of the last command executed + # @param duration [Integer] the duration of the last command executed def wait_until_end_of_interval(duration) # Sleep for the remainder of the interval, or 0 if the duration ran # longer than the interval. sleeptime = [0, @interval - duration].max if sleeptime > 0 @@ -122,7 +133,12 @@ @logger.warn("Execution ran longer than the interval. Skipping sleep.", :command => @command, :duration => duration, :interval => @interval) end end + # convert seconds to nanoseconds + # @param time_diff [Numeric] the (rational value) difference to convert + def to_nanos(time_diff) + (time_diff * 1_000_000).to_i + end end # class LogStash::Inputs::Exec