lib/fluent/plugin/in_dstat.rb in fluent-plugin-dstat-0.3.3 vs lib/fluent/plugin/in_dstat.rb in fluent-plugin-dstat-1.0.0
- old
+ new
@@ -1,12 +1,14 @@
-require 'fluent/input'
+require 'fluent/plugin/input'
-module Fluent
+module Fluent::Plugin
class DstatInput < Input
- Plugin.register_input('dstat', self)
+ Fluent::Plugin.register_input('dstat', self)
+ helpers :timer, :event_loop
+
def initialize
super
require 'csv'
@line_number = 0
@@ -22,10 +24,14 @@
def desc(description)
end
end
end
+ unless method_defined?(:log)
+ define_method("log") { $log }
+ end
+
desc "the tag of event"
config_param :tag, :string
desc "dstat command path"
config_param :dstat_path, :string, :default => "dstat"
desc "dstat command line option"
@@ -44,71 +50,58 @@
@hostname = `#{@hostname_command}`.chomp!
begin
`#{@dstat_path} --version`
rescue Errno::ENOENT
- raise ConfigError, "'#{@dstat_path}' command not found. Install dstat before run fluentd"
+ raise Fluent::ConfigError, "'#{@dstat_path}' command not found. Install dstat before run fluentd"
end
end
def check_dstat
now = Time.now
if now - @last_time > @delay * 3
- $log.info "Process dstat(#{@pid}) is stopped. Last updated: #{@last_time}"
+ log.info "Process dstat(#{@pid}) is stopped. Last updated: #{@last_time}"
restart
end
end
def start
+ super
system("mkfifo #{@tmp_file}")
@io = IO.popen(@command, "r")
@pid = @io.pid
- @loop = Coolio::Loop.new
@dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines))
- @dw.attach(@loop)
- @tw = TimerWatcher.new(1, true, &method(:check_dstat))
- @tw.attach(@loop)
- @thread = Thread.new(&method(:run))
+ event_loop_attach(@dw)
+ @tw = timer_execute(:in_dstat_timer, 1, &method(:check_dstat))
end
def shutdown
Process.kill(:TERM, @pid)
@dw.detach
@tw.detach
- @loop.stop
- @thread.join
File.delete(@tmp_file)
+ super
end
- def run
- begin
- @loop.run
- rescue
- $log.error "unexpected error", :error=>$!.to_s
- $log.error_backtrace
- end
- end
-
def restart
Process.detach(@pid)
begin
Process.kill(:TERM, @pid)
rescue Errno::ESRCH => e
- $log.error "unexpected death of a child process", :error=>e.to_s
- $log.error_backtrace
+ log.error "unexpected death of a child process", :error=>e.to_s
+ log.error_backtrace
end
@dw.detach
@tw.detach
@line_number = 0
@io = IO.popen(@command, "r")
@pid = @io.pid
@dw = DstatCSVWatcher.new(@tmp_file, &method(:receive_lines))
- @dw.attach(@loop)
- @tw = TimerWatcher.new(1, true, &method(:check_dstat))
- @tw.attach(@loop)
+ event_loop_attach(@dw)
+ @tw = timer_execute(:in_dstat_timer, 1, &method(:check_dstat))
end
def receive_lines(lines)
lines.each do |line|
next if line == ""
@@ -142,11 +135,11 @@
end
record = {
'hostname' => @hostname,
'dstat' => data
}
- router.emit(@tag, Engine.now, record)
+ router.emit(@tag, Fluent::Engine.now, record)
end
@line_number += 1
@last_time = Time.now
end
@@ -171,23 +164,9 @@
lines[0] = @partial + lines.first unless @partial.empty?
@partial = buffer.end_with?("\n") ? "" : lines.pop
@receive_lines.call(lines)
rescue IO::WaitReadable
# will be readable on next event
- end
- end
-
- class TimerWatcher < Cool.io::TimerWatcher
- def initialize(interval, repeat, &check_dstat)
- @check_dstat = check_dstat
- super(interval, repeat)
- end
-
- def on_timer
- @check_dstat.call
- rescue
- $log.error $!.to_s
- $log.error_backtrace
end
end
end
end