lib/fluent/plugin/in_tail.rb in fluentd-0.10.11 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.12
- old
+ new
@@ -26,10 +26,11 @@
@paths = []
end
config_param :path, :string
config_param :tag, :string
+ config_param :rotate_wait, :time, :default => 5
config_param :pos_file, :string, :default => nil
def configure(conf)
super
@@ -55,19 +56,14 @@
@parser.configure(conf)
end
def start
@loop = Coolio::Loop.new
- handlers = @paths.map {|path|
- $log.debug "following tail of #{path}"
+ @tailers = @paths.map {|path|
pe = @pf ? @pf[path] : NullPositionEntry.instance
- h = Handler.new(path, pe, method(:receive_lines))
- @loop.attach h
+ Tailer.new(@loop, path, @rotate_wait, pe, method(:receive_lines))
}
- handlers.each {|h|
- h.on_change(nil, nil) # initialize call # TODO prev, cur
- }
@thread = Thread.new(&method(:run))
end
def shutdown
@loop.watchers.each {|w| w.detach }
@@ -105,118 +101,230 @@
def parse_line(line)
return @parser.parse(line)
end
- class Handler < Coolio::StatWatcher
- def initialize(path, pe, callback)
- stat = File.stat(path)
+ class Tailer
+ def initialize(loop, path, rotate_wait, pe, receive_lines)
+ @loop = loop
+ @path = path
+ @rotate_wait = rotate_wait
@pe = pe
- @inode = stat.ino
- if @inode == @pe.read_inode
- # seek to the saved position
- @pos = @pe.read_pos
- else
- # seek to the end of file first.
- # logs never duplicate but may be lost if fluent is down.
- @pos = stat.size
- @pe.update(@inode, @pos)
+ @receive_lines = receive_lines
+
+ @rotate_queue = []
+ @rotate_timer = nil
+ @io_handler = nil
+
+ @rh = RotateHandler.new(path, method(:rotate))
+ @rh.check # invoke rotate
+ @rh.on_rotate = method(:on_rotate)
+ @rh.attach(@loop)
+ end
+
+ def on_rotate(io)
+ return if @rotate_queue.include?(io)
+ $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds"
+ @rotate_queue.push(io)
+
+ # start rotate_timer
+ unless @rotate_timer
+ @rotate_timer = RotateTimer.new(@rotate_wait, method(:on_rotate_timer))
+ @rotate_timer.attach(@loop)
end
- @buffer = ''
- @callback = callback
- super(path)
end
- def on_change(prev, cur)
- lines = []
- inode = nil
+ def on_rotate_timer
+ io = @rotate_queue.first
+ rotate(io, 0)
+ end
- File.open(path) {|f|
- stat = f.stat
+ def rotate(io, start_pos=nil)
+ # start_pos is nil if first
+ io_handler = IOHandler.new(io, start_pos, @pe, @receive_lines)
+
+ if @io_handler
+ @io_handler.close
+ @io_handler = nil
+ end
+ io_handler.attach(@loop)
+ @io_handler = io_handler
+ @rotate_queue.shift
+
+ if @rotate_queue.empty?
+ @rotate_timer.detach if @rotate_timer
+ @rotate_timer = nil
+ end
+ end
+
+ def shutdown
+ @rotate_queue.reject! {|io|
+ io.close
+ true
+ }
+ if @io_handler
+ @io_handler.close
+ @io_handler = nil
+ end
+ if @rotate_timer
+ @rotate_timer.detach
+ @rotate_timer = nil
+ end
+ end
+ end
+
+ class RotateHandler
+ def initialize(path, on_rotate)
+ @path = path
+ @inode = nil
+ @fsize = 0
+ @on_rotate = on_rotate
+ @path = path
+ @stat_watcher = Stat.new(self, @path)
+ @timer_watcher = Timer.new(self, 1)
+ end
+
+ attr_accessor :on_rotate
+
+ def check
+ begin
+ io = File.open(@path)
+ rescue Errno::ENOENT
+ # moved or deleted
+ @inode = nil
+ @fsize = 0
+ return
+ end
+
+ begin
+ stat = io.stat
inode = stat.ino
+ fsize = stat.size
- if @inode != inode || stat.size < @pos
- # moved or deleted
- @pos = 0
- else
- f.seek(@pos)
+ if @inode != inode || fsize < @fsize
+ # rotated or truncated
+ @on_rotate.call(io)
+ io = nil
end
- line = f.gets
- unless line
- break
- end
+ @inode = inode
+ @fsize = fsize
+ ensure
+ io.close if io
+ end
- @buffer << line
- unless line[line.length-1] == ?\n
- @pos = f.pos
- break
- end
+ rescue
+ $log.error $!.to_s
+ $log.error_backtrace
+ end
- lines << @buffer
- @buffer = ''
+ def attach(loop)
+ @stat_watcher.attach(loop)
+ @timer_watcher.attach(loop)
+ end
- while line = f.gets
- unless line[line.length-1] == ?\n
- @buffer = line
- break
- end
- lines << line
- end
+ def detach
+ @stat_watcher.detach
+ @timer_watcher.detach
+ end
- @pos = f.pos
- }
+ def attached?
+ @stat_watcher.attached?
+ end
- if @inode != inode
- @pe.update(inode, @pos)
- @inode = inode
- else
- @pe.update_pos(@pos)
+ class Stat < Coolio::StatWatcher
+ def initialize(h, path)
+ @h = h
+ super(path)
end
- @callback.call(lines) unless lines.empty?
+ def on_change(prev, cur)
+ @h.check
+ end
+ end
- rescue Errno::ENOENT
- # moved or deleted
- @pos = 0
- # TODO rescue
+ class Timer < Coolio::TimerWatcher
+ def initialize(h, interval)
+ @h = h
+ super(interval, true)
+ end
+
+ def on_timer
+ @h.check
+ end
end
end
- # pos inode
- # ffffffffffffffff\tffffffff\n
- class PositionEntry
- POS_SIZE = 16
- INO_OFFSET = 17
- INO_SIZE = 8
- LN_OFFSET = 25
- SIZE = 26
-
- def initialize(file, seek)
- @file = file
- @seek = seek
+ class RotateTimer < Coolio::TimerWatcher
+ def initialize(interval, callback)
+ super(interval, true)
+ @callback = callback
end
- def update(ino, pos)
- @file.pos = @seek
- @file.write "%016x\t%08x" % [pos, ino]
- @inode = ino
+ def on_timer
+ @callback.call
+ rescue
+ # TODO log?
end
+ end
- def update_pos(pos)
- @file.pos = @seek
- @file.write "%016x" % pos
+ class IOHandler < Coolio::IOWatcher
+ def initialize(io, start_pos, pe, receive_lines)
+ $log.info "following tail of #{io.path}"
+ @io = io
+ @pe = pe
+ @receive_lines = receive_lines
+
+ if start_pos
+ # rotated
+ @pos = start_pos
+
+ else
+ # first time
+ stat = io.stat
+ fsize = stat.size
+ inode = stat.ino
+ if inode == @pe.read_inode
+ # seek to the saved position
+ @pos = @pe.read_pos
+ else
+ # seek to the end of the file.
+ # logs never duplicate but may be lost if fluentd is down.
+ @pos = fsize
+ @pe.update(inode, @pos)
+ end
+ end
+
+ io.seek(@pos)
+
+ @buffer = ''
+ super(io)
end
- def read_inode
- @file.pos = @seek + INO_OFFSET
- @file.read(8).to_i(16)
+ def on_readable
+ lines = []
+
+ while line = @io.gets
+ @buffer << line
+ @pos = @io.pos
+ unless @buffer[@buffer.length-1] == ?\n
+ break
+ end
+ lines << line
+ end
+
+ @pe.update_pos(@pos)
+ @receive_lines.call(lines) unless lines.empty?
+ rescue
+ $log.error $!.to_s
+ $log.error_backtrace
+ close
end
- def read_pos
- @file.pos = @seek
- @file.read(16).to_i(16)
+ def close
+ detach if attached?
+ @io.close unless @io.closed?
end
end
class PositionFile
def initialize(file, map, last_pos)
@@ -251,9 +359,45 @@
ino = m[3].to_i(16)
seek = file.pos - line.bytesize + path.bytesize + 1
map[path] = PositionEntry.new(file, seek)
}
new(file, map, file.pos)
+ end
+ end
+
+ # pos inode
+ # ffffffffffffffff\tffffffff\n
+ class PositionEntry
+ POS_SIZE = 16
+ INO_OFFSET = 17
+ INO_SIZE = 8
+ LN_OFFSET = 25
+ SIZE = 26
+
+ def initialize(file, seek)
+ @file = file
+ @seek = seek
+ end
+
+ def update(ino, pos)
+ @file.pos = @seek
+ @file.write "%016x\t%08x" % [pos, ino]
+ @inode = ino
+ end
+
+ def update_pos(pos)
+ @file.pos = @seek
+ @file.write "%016x" % pos
+ end
+
+ def read_inode
+ @file.pos = @seek + INO_OFFSET
+ @file.read(8).to_i(16)
+ end
+
+ def read_pos
+ @file.pos = @seek
+ @file.read(16).to_i(16)
end
end
class NullPositionEntry
require 'singleton'