lib/fluent/plugin/in_tail.rb in fluentd-0.10.11 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.12

- old
+ new

@@ -26,10 +26,11 @@ @paths = [] end config_param :path, :string config_param :tag, :string + config_param :rotate_wait, :time, :default => 5 config_param :pos_file, :string, :default => nil def configure(conf) super @@ -55,19 +56,14 @@ @parser.configure(conf) end def start @loop = Coolio::Loop.new - handlers = @paths.map {|path| - $log.debug "following tail of #{path}" + @tailers = @paths.map {|path| pe = @pf ? @pf[path] : NullPositionEntry.instance - h = Handler.new(path, pe, method(:receive_lines)) - @loop.attach h + Tailer.new(@loop, path, @rotate_wait, pe, method(:receive_lines)) } - handlers.each {|h| - h.on_change(nil, nil) # initialize call # TODO prev, cur - } @thread = Thread.new(&method(:run)) end def shutdown @loop.watchers.each {|w| w.detach } @@ -105,118 +101,230 @@ def parse_line(line) return @parser.parse(line) end - class Handler < Coolio::StatWatcher - def initialize(path, pe, callback) - stat = File.stat(path) + class Tailer + def initialize(loop, path, rotate_wait, pe, receive_lines) + @loop = loop + @path = path + @rotate_wait = rotate_wait @pe = pe - @inode = stat.ino - if @inode == @pe.read_inode - # seek to the saved position - @pos = @pe.read_pos - else - # seek to the end of file first. - # logs never duplicate but may be lost if fluent is down. - @pos = stat.size - @pe.update(@inode, @pos) + @receive_lines = receive_lines + + @rotate_queue = [] + @rotate_timer = nil + @io_handler = nil + + @rh = RotateHandler.new(path, method(:rotate)) + @rh.check # invoke rotate + @rh.on_rotate = method(:on_rotate) + @rh.attach(@loop) + end + + def on_rotate(io) + return if @rotate_queue.include?(io) + $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds" + @rotate_queue.push(io) + + # start rotate_timer + unless @rotate_timer + @rotate_timer = RotateTimer.new(@rotate_wait, method(:on_rotate_timer)) + @rotate_timer.attach(@loop) end - @buffer = '' - @callback = callback - super(path) end - def on_change(prev, cur) - lines = [] - inode = nil + def on_rotate_timer + io = @rotate_queue.first + rotate(io, 0) + end - File.open(path) {|f| - stat = f.stat + def rotate(io, start_pos=nil) + # start_pos is nil if first + io_handler = IOHandler.new(io, start_pos, @pe, @receive_lines) + + if @io_handler + @io_handler.close + @io_handler = nil + end + io_handler.attach(@loop) + @io_handler = io_handler + @rotate_queue.shift + + if @rotate_queue.empty? + @rotate_timer.detach if @rotate_timer + @rotate_timer = nil + end + end + + def shutdown + @rotate_queue.reject! {|io| + io.close + true + } + if @io_handler + @io_handler.close + @io_handler = nil + end + if @rotate_timer + @rotate_timer.detach + @rotate_timer = nil + end + end + end + + class RotateHandler + def initialize(path, on_rotate) + @path = path + @inode = nil + @fsize = 0 + @on_rotate = on_rotate + @path = path + @stat_watcher = Stat.new(self, @path) + @timer_watcher = Timer.new(self, 1) + end + + attr_accessor :on_rotate + + def check + begin + io = File.open(@path) + rescue Errno::ENOENT + # moved or deleted + @inode = nil + @fsize = 0 + return + end + + begin + stat = io.stat inode = stat.ino + fsize = stat.size - if @inode != inode || stat.size < @pos - # moved or deleted - @pos = 0 - else - f.seek(@pos) + if @inode != inode || fsize < @fsize + # rotated or truncated + @on_rotate.call(io) + io = nil end - line = f.gets - unless line - break - end + @inode = inode + @fsize = fsize + ensure + io.close if io + end - @buffer << line - unless line[line.length-1] == ?\n - @pos = f.pos - break - end + rescue + $log.error $!.to_s + $log.error_backtrace + end - lines << @buffer - @buffer = '' + def attach(loop) + @stat_watcher.attach(loop) + @timer_watcher.attach(loop) + end - while line = f.gets - unless line[line.length-1] == ?\n - @buffer = line - break - end - lines << line - end + def detach + @stat_watcher.detach + @timer_watcher.detach + end - @pos = f.pos - } + def attached? + @stat_watcher.attached? + end - if @inode != inode - @pe.update(inode, @pos) - @inode = inode - else - @pe.update_pos(@pos) + class Stat < Coolio::StatWatcher + def initialize(h, path) + @h = h + super(path) end - @callback.call(lines) unless lines.empty? + def on_change(prev, cur) + @h.check + end + end - rescue Errno::ENOENT - # moved or deleted - @pos = 0 - # TODO rescue + class Timer < Coolio::TimerWatcher + def initialize(h, interval) + @h = h + super(interval, true) + end + + def on_timer + @h.check + end end end - # pos inode - # ffffffffffffffff\tffffffff\n - class PositionEntry - POS_SIZE = 16 - INO_OFFSET = 17 - INO_SIZE = 8 - LN_OFFSET = 25 - SIZE = 26 - - def initialize(file, seek) - @file = file - @seek = seek + class RotateTimer < Coolio::TimerWatcher + def initialize(interval, callback) + super(interval, true) + @callback = callback end - def update(ino, pos) - @file.pos = @seek - @file.write "%016x\t%08x" % [pos, ino] - @inode = ino + def on_timer + @callback.call + rescue + # TODO log? end + end - def update_pos(pos) - @file.pos = @seek - @file.write "%016x" % pos + class IOHandler < Coolio::IOWatcher + def initialize(io, start_pos, pe, receive_lines) + $log.info "following tail of #{io.path}" + @io = io + @pe = pe + @receive_lines = receive_lines + + if start_pos + # rotated + @pos = start_pos + + else + # first time + stat = io.stat + fsize = stat.size + inode = stat.ino + if inode == @pe.read_inode + # seek to the saved position + @pos = @pe.read_pos + else + # seek to the end of the file. + # logs never duplicate but may be lost if fluentd is down. + @pos = fsize + @pe.update(inode, @pos) + end + end + + io.seek(@pos) + + @buffer = '' + super(io) end - def read_inode - @file.pos = @seek + INO_OFFSET - @file.read(8).to_i(16) + def on_readable + lines = [] + + while line = @io.gets + @buffer << line + @pos = @io.pos + unless @buffer[@buffer.length-1] == ?\n + break + end + lines << line + end + + @pe.update_pos(@pos) + @receive_lines.call(lines) unless lines.empty? + rescue + $log.error $!.to_s + $log.error_backtrace + close end - def read_pos - @file.pos = @seek - @file.read(16).to_i(16) + def close + detach if attached? + @io.close unless @io.closed? end end class PositionFile def initialize(file, map, last_pos) @@ -251,9 +359,45 @@ ino = m[3].to_i(16) seek = file.pos - line.bytesize + path.bytesize + 1 map[path] = PositionEntry.new(file, seek) } new(file, map, file.pos) + end + end + + # pos inode + # ffffffffffffffff\tffffffff\n + class PositionEntry + POS_SIZE = 16 + INO_OFFSET = 17 + INO_SIZE = 8 + LN_OFFSET = 25 + SIZE = 26 + + def initialize(file, seek) + @file = file + @seek = seek + end + + def update(ino, pos) + @file.pos = @seek + @file.write "%016x\t%08x" % [pos, ino] + @inode = ino + end + + def update_pos(pos) + @file.pos = @seek + @file.write "%016x" % pos + end + + def read_inode + @file.pos = @seek + INO_OFFSET + @file.read(8).to_i(16) + end + + def read_pos + @file.pos = @seek + @file.read(16).to_i(16) end end class NullPositionEntry require 'singleton'