lib/fluent/plugin/in_watch_process.rb in fluent-plugin-watch-process-0.0.2 vs lib/fluent/plugin/in_watch_process.rb in fluent-plugin-watch-process-0.0.3
- old
+ new
@@ -1,55 +1,43 @@
+require 'fluent/mixin/rewrite_tag_name'
+require 'fluent/mixin/type_converter'
+
module Fluent
class WatchProcessInput < Fluent::Input
Plugin.register_input('watch_process', self)
config_param :tag, :string
config_param :command, :string, :default => nil
config_param :keys, :string, :default => nil
- config_param :types, :string, :default => nil
config_param :interval, :string, :default => '5s'
config_param :lookup_user, :string, :default => nil
config_param :hostname_command, :string, :default => 'hostname'
DEFAULT_KEYS = %w(start_time user pid parent_pid cpu_time cpu_percent memory_percent mem_rss mem_size state proc_name command)
- DEFAULT_TYPES = %w(pid:integer parent_pid:integer cpu_percent:float memory_percent:float mem_rss:integer mem_size:integer)
+ DEFAULT_TYPES = "pid:integer,parent_pid:integer,cpu_percent:float,memory_percent:float,mem_rss:integer,mem_size:integer"
- Converters = {
- 'string' => lambda { |v| v.to_s },
- 'integer' => lambda { |v| v.to_i },
- 'float' => lambda { |v| v.to_f },
- 'bool' => lambda { |v|
- case v.downcase
- when 'true', 'yes', '1'
- true
- else
- false
- end
- },
- 'time' => lambda { |v, time_parser|
- time_parser.parse(v)
- },
- 'array' => lambda { |v, delimiter|
- v.to_s.split(delimiter)
- }
- }
+ include Fluent::HandleTagNameMixin
+ include Fluent::Mixin::RewriteTagName
+ config_set_default :enable_placeholder_upcase, true
+ config_set_default :enable_placeholder_hostname, true
+
+ include Fluent::Mixin::TypeConverter
+ config_param :types, :string, :default => DEFAULT_TYPES
+
def initialize
super
require 'time'
end
def configure(conf)
super
@command = @command || get_ps_command
@keys = @keys.nil? ? DEFAULT_KEYS : @keys.to_s.gsub(' ', '').split(',')
- @types = @types || DEFAULT_TYPES
- @types_map = Hash[types.map{|v| v.split(':')}]
@lookup_user = @lookup_user.gsub(' ', '').split(',') unless @lookup_user.nil?
@interval = Config.time_value(@interval)
- @hostname = `#{@hostname_command}`.chomp
$log.info "watch_process: polling start. :tag=>#{@tag} :lookup_user=>#{@lookup_user} :interval=>#{@interval} :command=>#{@command}"
end
def start
@thread = Thread.new(&method(:run))
@@ -63,26 +51,22 @@
loop do
io = IO.popen(@command, 'r')
io.gets
while result = io.gets
keys_size = @keys.size
- if result =~ /(?<lstart>(^\w+ \w+ \d+ \d\d:\d\d:\d\d \d+))/
+ if result =~ /(?<lstart>(^\w+\s+\w+\s+\d+\s+\d\d:\d\d:\d\d \d+))/
lstart = Time.parse($~[:lstart])
- values = result.sub($~[:lstart], '')
+ result = result.sub($~[:lstart], '')
keys_size -= 1
end
- values = values.chomp.strip.split(/\s+/, keys_size)
- data = Hash[
- @keys.zip([lstart.to_s, values].reject(&:empty?).flatten).map do |k,v|
- v = Converters[@types_map[k]].call(v) if @types_map.include?(k)
- [k,v]
- end
- ]
+ values = [lstart.to_s, result.chomp.strip.split(/\s+/, keys_size)]
+ data = Hash[@keys.zip(values.reject(&:empty?).flatten)]
data['elapsed_time'] = (Time.now - Time.parse(data['start_time'])).to_i if data['start_time']
next unless @lookup_user.nil? || @lookup_user.include?(data['user'])
- tag = @tag.gsub(/(\${[a-z]+}|__[A-Z]+__)/, get_placeholder)
- Engine.emit(tag, Engine.now, data)
+ emit_tag = tag.dup
+ filter_record(emit_tag, Engine.now, data)
+ Engine.emit(emit_tag, Engine.now, data)
end
io.close
sleep @interval
end
rescue StandardError => e
@@ -113,14 +97,7 @@
def OS.linux?
OS.unix? and not OS.mac?
end
end
-
- def get_placeholder
- return {
- '__HOSTNAME__' => @hostname,
- '${hostname}' => @hostname,
- }
- end
end
end