lib/fluent/plugin/in_watch_process.rb in fluent-plugin-watch-process-0.0.1 vs lib/fluent/plugin/in_watch_process.rb in fluent-plugin-watch-process-0.0.2

- old
+ new

@@ -8,10 +8,13 @@ config_param :types, :string, :default => nil config_param :interval, :string, :default => '5s' config_param :lookup_user, :string, :default => nil config_param :hostname_command, :string, :default => 'hostname' + DEFAULT_KEYS = %w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command) + DEFAULT_TYPES = %w(pid:integer parent_pid:integer cpu_percent:float memory_percent:float mem_rss:integer mem_size:integer) + Converters = { 'string' => lambda { |v| v.to_s }, 'integer' => lambda { |v| v.to_i }, 'float' => lambda { |v| v.to_f }, 'bool' => lambda { |v| @@ -37,12 +40,12 @@ def configure(conf) super @command = @command || get_ps_command - @keys = @keys || %w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command) - types = @types || %w(pid:integer parent_pid:integer cpu_percent:float memory_percent:float mem_rss:integer mem_size:integer) + @keys = @keys.nil? ? DEFAULT_KEYS : @keys.to_s.gsub(' ', '').split(',') + @types = @types || DEFAULT_TYPES @types_map = Hash[types.map{|v| v.split(':')}] @lookup_user = @lookup_user.gsub(' ', '').split(',') unless @lookup_user.nil? @interval = Config.time_value(@interval) @hostname = `#{@hostname_command}`.chomp $log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}" @@ -59,25 +62,32 @@ def run loop do io = IO.popen(@command, 'r') io.gets while result = io.gets - values = result.chomp.strip.split(/\s+/, @keys.size + 4) - time = Time.parse(values[0...5].join(' ')) + keys_size = @keys.size + if result =~ /(?<lstart>(^\w+ \w+ \d+ \d\d:\d\d:\d\d \d+))/ + lstart = Time.parse($~[:lstart]) + values = result.sub($~[:lstart], '') + keys_size -= 1 + end + values = values.chomp.strip.split(/\s+/, keys_size) data = Hash[ - @keys.zip([time.to_s, values.values_at(5..15)].flatten).map do |k,v| + @keys.zip([lstart.to_s, values].reject(&:empty?).flatten).map do |k,v| v = Converters[@types_map[k]].call(v) if @types_map.include?(k) [k,v] end ] - data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i + data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time'] next unless @lookup_user.nil? || @lookup_user.include?(data['user']) tag = @tag.gsub(/(\${[a-z]+}|__[A-Z]+__)/, get_placeholder) Engine.emit(tag, Engine.now, data) end io.close sleep @interval end + rescue StandardError => e + $log.error "watch_process: error has occured. #{e.message}" end def get_ps_command if OS.linux? "LANG=en_US.UTF-8 && ps -ewwo lstart,user:20,pid,ppid,time,%cpu,%mem,rss,sz,s,comm,cmd"