lib/fluent/plugin/in_dstat.rb in fluent-plugin-dstat-0.2.3 vs lib/fluent/plugin/in_dstat.rb in fluent-plugin-dstat-0.2.4

- old
+ new

@@ -1,176 +1,183 @@ +require 'fluent/mixin/rewrite_tag_name' + module Fluent + class DstatInput < Input -class DstatInput < Input + Plugin.register_input('dstat', self) - Plugin.register_input('dstat', self) + def initialize + super - def initialize - super - require 'csv' - @line_number = 0 - @first_keys = [] - @second_keys = [] - @data_array = [] - @max_lines = 100 - @last_time = Time.now - end + require 'csv' + @line_number = 0 + @first_keys = [] + @second_keys = [] + @data_array = [] + @max_lines = 100 + @last_time = Time.now + end - config_param :tag, :string - config_param :option, :string, :default => "-fcdnm" - config_param :delay, :integer, :default => 1 - config_param :tmp_file, :string, :default => "/tmp/dstat.csv" - config_param :hostname_command, :string, :default => "hostname" + config_param :tag, :string + config_param :dstat_path, :string, :default => "dstat" + config_param :option, :string, :default => "-fcdnm" + config_param :delay, :integer, :default => 1 + config_param :tmp_file, :string, :default => "/tmp/dstat.csv" + config_param :hostname_command, :string, :default => "hostname" - def configure(conf) - super - @command = "dstat #{@option} --output #{@tmp_file} #{@delay}" - @hostname = `#{@hostname_command}`.chomp! - end + include Fluent::Mixin::RewriteTagName - def check_dstat - restart if (Time.now - @last_time) > @delay*3 - end + def configure(conf) + super - def start - touch_or_truncate(@tmp_file) - @io = IO.popen(@command, "r") - @pid = @io.pid + @command = "#{@dstat_path} #{@option} --output #{@tmp_file} #{@delay}" + @hostname = `#{@hostname_command}`.chomp! + end - @loop = Coolio::Loop.new - @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) - @dw.attach(@loop) - @tw = TimerWatcher.new(1, true, &method(:check_dstat)) - @tw.attach(@loop) - @thread = Thread.new(&method(:run)) - end + def check_dstat + restart if (Time.now - @last_time) > @delay*3 + end - def shutdown - Process.kill(:TERM, @pid) - @dw.detach - @tw.detach - @loop.stop - @thread.join - File.delete(@tmp_file) - end + def start + touch_or_truncate(@tmp_file) + @io = IO.popen(@command, "r") + @pid = @io.pid - def run - begin - @loop.run - rescue - $log.error "unexpected error", :error=>$!.to_s - $log.error_backtrace + @loop = Coolio::Loop.new + @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) + @dw.attach(@loop) + @tw = TimerWatcher.new(1, true, &method(:check_dstat)) + @tw.attach(@loop) + @thread = Thread.new(&method(:run)) end - end - def restart - Process.detach(@pid) - Process.kill(:TERM, @pid) - @dw.detach - @tw.detach - @line_number = 0 - touch_or_truncate(@tmp_file) + def shutdown + Process.kill(:TERM, @pid) + @dw.detach + @tw.detach + @loop.stop + @thread.join + File.delete(@tmp_file) + end - @io = IO.popen(@command, "r") - @pid = @io.pid - @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) - @dw.attach(@loop) - @tw = TimerWatcher.new(1, true, &method(:check_dstat)) - @tw.attach(@loop) - end + def run + begin + @loop.run + rescue + $log.error "unexpected error", :error=>$!.to_s + $log.error_backtrace + end + end - def touch_or_truncate(file) - if File.exist?(file) - File.truncate(file, 0) - else - `touch #{file}` + def restart + Process.detach(@pid) + Process.kill(:TERM, @pid) + @dw.detach + @tw.detach + @line_number = 0 + touch_or_truncate(@tmp_file) + + @io = IO.popen(@command, "r") + @pid = @io.pid + @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) + @dw.attach(@loop) + @tw = TimerWatcher.new(1, true, &method(:check_dstat)) + @tw.attach(@loop) end - end - def receive_lines(lines) - lines.each do |line| - next if line == "" - case @line_number - when 0..1 - when 2 - line.delete!("\"") - @first_keys = CSV.parse_line(line) - pre_key = "" - @first_keys.each_with_index do |key, index| - if key.nil? || key == "" - @first_keys[index] = pre_key + def touch_or_truncate(file) + if File.exist?(file) + File.truncate(file, 0) + else + `touch #{file}` + end + end + + def receive_lines(lines) + lines.each do |line| + next if line == "" + case @line_number + when 0..1 + when 2 + line.delete!("\"") + @first_keys = CSV.parse_line(line) + pre_key = "" + @first_keys.each_with_index do |key, index| + if key.nil? || key == "" + @first_keys[index] = pre_key + end + pre_key = @first_keys[index] end - pre_key = @first_keys[index] + when 3 + line.delete!("\"") + @second_keys = line.split(',') + @first_keys.each_with_index do |key, index| + @data_array[index] = {} + @data_array[index][:first] = key + @data_array[index][:second] = @second_keys[index] + end + else + values = line.split(',') + data = Hash.new { |hash,key| hash[key] = Hash.new {} } + values.each_with_index do |v, index| + data[@first_keys[index]][@second_keys[index]] = v + end + record = { + 'hostname' => @hostname, + 'dstat' => data + } + emit_tag = @tag.dup + filter_record(emit_tag, Engine.now, record) + Engine.emit(emit_tag, Engine.now, record) end - when 3 - line.delete!("\"") - @second_keys = line.split(',') - @first_keys.each_with_index do |key, index| - @data_array[index] = {} - @data_array[index][:first] = key - @data_array[index][:second] = @second_keys[index] + + if (@line_number % @max_lines) == (@max_lines - 1) + @dw.detach + File.truncate(@tmp_file, 0) + @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) + @dw.attach(@loop) end - else - values = line.split(',') - data = Hash.new { |hash,key| hash[key] = Hash.new {} } - values.each_with_index do |v, index| - data[@first_keys[index]][@second_keys[index]] = v - end - record = { - 'hostname' => @hostname, - 'dstat' => data - } - Engine.emit(@tag, Engine.now, record) - end - if (@line_number % @max_lines) == (@max_lines - 1) - @dw.detach - File.truncate(@tmp_file, 0) - @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) - @dw.attach(@loop) + @line_number += 1 + @last_time = Time.now end - @line_number += 1 - @last_time = Time.now end - end + class DstatCSVWatcher < Cool.io::StatWatcher + INTERVAL = 0.500 + attr_accessor :previous, :cur - class DstatCSVWatcher < Cool.io::StatWatcher - INTERVAL = 0.500 - attr_accessor :previous, :cur + def initialize(path, &receive_lines) + super path, INTERVAL + @path = path + @io = File.open(path) + @pos = 0 + @receive_lines = receive_lines + end - def initialize(path, &receive_lines) - super path, INTERVAL - @path = path - @io = File.open(path) - @pos = 0 - @receive_lines = receive_lines + def on_change(prev, cur) + buffer = @io.read(cur.size - @pos) + @pos = cur.size + lines = [] + while line = buffer.slice!(/.*?\n/m) + lines << line.chomp + end + @receive_lines.call(lines) + end end - def on_change(prev, cur) - buffer = @io.read(cur.size - @pos) - @pos = cur.size - lines = [] - while line = buffer.slice!(/.*?\n/m) - lines << line.chomp + class TimerWatcher < Coolio::TimerWatcher + def initialize(interval, repeat, &check_dstat) + @check_dstat = check_dstat + super(interval, repeat) end - @receive_lines.call(lines) - end - end - class TimerWatcher < Coolio::TimerWatcher - def initialize(interval, repeat, &check_dstat) - @check_dstat = check_dstat - super(interval, repeat) - end - def on_timer - @check_dstat.call - rescue - $log.error $!.to_s - $log.error_backtrace + def on_timer + @check_dstat.call + rescue + $log.error $!.to_s + $log.error_backtrace + end end end -end - - end