lib/fluent/plugin/in_dstat.rb in fluent-plugin-dstat-0.3.3 vs lib/fluent/plugin/in_dstat.rb in fluent-plugin-dstat-1.0.0

- old
+ new

@@ -1,12 +1,14 @@ -require 'fluent/input' +require 'fluent/plugin/input' -module Fluent +module Fluent::Plugin class DstatInput < Input - Plugin.register_input('dstat', self) + Fluent::Plugin.register_input('dstat', self) + helpers :timer, :event_loop + def initialize super require 'csv' @line_number = 0 @@ -22,10 +24,14 @@ def desc(description) end end end + unless method_defined?(:log) + define_method("log") { $log } + end + desc "the tag of event" config_param :tag, :string desc "dstat command path" config_param :dstat_path, :string, :default => "dstat" desc "dstat command line option" @@ -44,71 +50,58 @@ @hostname = `#{@hostname_command}`.chomp! begin `#{@dstat_path} --version` rescue Errno::ENOENT - raise ConfigError, "'#{@dstat_path}' command not found. Install dstat before run fluentd" + raise Fluent::ConfigError, "'#{@dstat_path}' command not found. Install dstat before run fluentd" end end def check_dstat now = Time.now if now - @last_time > @delay * 3 - $log.info "Process dstat(#{@pid}) is stopped. Last updated: #{@last_time}" + log.info "Process dstat(#{@pid}) is stopped. Last updated: #{@last_time}" restart end end def start + super system("mkfifo #{@tmp_file}") @io = IO.popen(@command, "r") @pid = @io.pid - @loop = Coolio::Loop.new @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) - @dw.attach(@loop) - @tw = TimerWatcher.new(1, true, &method(:check_dstat)) - @tw.attach(@loop) - @thread = Thread.new(&method(:run)) + event_loop_attach(@dw) + @tw = timer_execute(:in_dstat_timer, 1, &method(:check_dstat)) end def shutdown Process.kill(:TERM, @pid) @dw.detach @tw.detach - @loop.stop - @thread.join File.delete(@tmp_file) + super end - def run - begin - @loop.run - rescue - $log.error "unexpected error", :error=>$!.to_s - $log.error_backtrace - end - end - def restart Process.detach(@pid) begin Process.kill(:TERM, @pid) rescue Errno::ESRCH => e - $log.error "unexpected death of a child process", :error=>e.to_s - $log.error_backtrace + log.error "unexpected death of a child process", :error=>e.to_s + log.error_backtrace end @dw.detach @tw.detach @line_number = 0 @io = IO.popen(@command, "r") @pid = @io.pid @dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines)) - @dw.attach(@loop) - @tw = TimerWatcher.new(1, true, &method(:check_dstat)) - @tw.attach(@loop) + event_loop_attach(@dw) + @tw = timer_execute(:in_dstat_timer, 1, &method(:check_dstat)) end def receive_lines(lines) lines.each do |line| next if line == "" @@ -142,11 +135,11 @@ end record = { 'hostname' => @hostname, 'dstat' => data } - router.emit(@tag, Engine.now, record) + router.emit(@tag, Fluent::Engine.now, record) end @line_number += 1 @last_time = Time.now end @@ -171,23 +164,9 @@ lines[0] = @partial + lines.first unless @partial.empty? @partial = buffer.end_with?("\n") ? "" : lines.pop @receive_lines.call(lines) rescue IO::WaitReadable # will be readable on next event - end - end - - class TimerWatcher < Cool.io::TimerWatcher - def initialize(interval, repeat, &check_dstat) - @check_dstat = check_dstat - super(interval, repeat) - end - - def on_timer - @check_dstat.call - rescue - $log.error $!.to_s - $log.error_backtrace end end end end