lib/fluent/plugin/in_watch_process.rb in fluent-plugin-watch-process-0.0.2 vs lib/fluent/plugin/in_watch_process.rb in fluent-plugin-watch-process-0.0.3

- old
+ new

@@ -1,55 +1,43 @@ +require 'fluent/mixin/rewrite_tag_name' +require 'fluent/mixin/type_converter' + module Fluent class WatchProcessInput < Fluent::Input Plugin.register_input('watch_process', self) config_param :tag, :string config_param :command, :string, :default => nil config_param :keys, :string, :default => nil - config_param :types, :string, :default => nil config_param :interval, :string, :default => '5s' config_param :lookup_user, :string, :default => nil config_param :hostname_command, :string, :default => 'hostname' DEFAULT_KEYS = %w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command) - DEFAULT_TYPES = %w(pid:integer parent_pid:integer cpu_percent:float memory_percent:float mem_rss:integer mem_size:integer) + DEFAULT_TYPES = "pid:integer,parent_pid:integer,cpu_percent:float,memory_percent:float,mem_rss:integer,mem_size:integer" - Converters = { - 'string' => lambda { |v| v.to_s }, - 'integer' => lambda { |v| v.to_i }, - 'float' => lambda { |v| v.to_f }, - 'bool' => lambda { |v| - case v.downcase - when 'true', 'yes', '1' - true - else - false - end - }, - 'time' => lambda { |v, time_parser| - time_parser.parse(v) - }, - 'array' => lambda { |v, delimiter| - v.to_s.split(delimiter) - } - } + include Fluent::HandleTagNameMixin + include Fluent::Mixin::RewriteTagName + config_set_default :enable_placeholder_upcase, true + config_set_default :enable_placeholder_hostname, true + + include Fluent::Mixin::TypeConverter + config_param :types, :string, :default => DEFAULT_TYPES + def initialize super require 'time' end def configure(conf) super @command = @command || get_ps_command @keys = @keys.nil? ? DEFAULT_KEYS : @keys.to_s.gsub(' ', '').split(',') - @types = @types || DEFAULT_TYPES - @types_map = Hash[types.map{|v| v.split(':')}] @lookup_user = @lookup_user.gsub(' ', '').split(',') unless @lookup_user.nil? @interval = Config.time_value(@interval) - @hostname = `#{@hostname_command}`.chomp $log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}" end def start @thread = Thread.new(&method(:run)) @@ -63,26 +51,22 @@ loop do io = IO.popen(@command, 'r') io.gets while result = io.gets keys_size = @keys.size - if result =~ /(?<lstart>(^\w+ \w+ \d+ \d\d:\d\d:\d\d \d+))/ + if result =~ /(?<lstart>(^\w+\s+\w+\s+\d+\s+\d\d:\d\d:\d\d \d+))/ lstart = Time.parse($~[:lstart]) - values = result.sub($~[:lstart], '') + result = result.sub($~[:lstart], '') keys_size -= 1 end - values = values.chomp.strip.split(/\s+/, keys_size) - data = Hash[ - @keys.zip([lstart.to_s, values].reject(&:empty?).flatten).map do |k,v| - v = Converters[@types_map[k]].call(v) if @types_map.include?(k) - [k,v] - end - ] + values = [lstart.to_s, result.chomp.strip.split(/\s+/, keys_size)] + data = Hash[@keys.zip(values.reject(&:empty?).flatten)] data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time'] next unless @lookup_user.nil? || @lookup_user.include?(data['user']) - tag = @tag.gsub(/(\${[a-z]+}|__[A-Z]+__)/, get_placeholder) - Engine.emit(tag, Engine.now, data) + emit_tag = tag.dup + filter_record(emit_tag, Engine.now, data) + Engine.emit(emit_tag, Engine.now, data) end io.close sleep @interval end rescue StandardError => e @@ -113,14 +97,7 @@ def OS.linux? OS.unix? and not OS.mac? end end - - def get_placeholder - return { - '__HOSTNAME__' => @hostname, - '${hostname}' => @hostname, - } - end end end