lib/fluent/plugin/in_tail.rb in fluentd-0.10.6 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.7

- old
+ new

@@ -26,39 +26,53 @@ @paths = [] end config_param :path, :string config_param :tag, :string + config_param :pos_file, :string, :default => nil def configure(conf) super @paths = @path.split(',').map {|path| path.strip } if @paths.empty? raise ConfigError, "tail: 'path' parameter is required on tail input" end + if @pos_file + @pf_file = File.open(@pos_file, File::RDWR|File::CREAT) + @pf_file.sync = true + @pf = PositionFile.parse(@pf_file) + end + configure_parser(conf) end def configure_parser(conf) @parser = TextParser.new @parser.configure(conf) end def start @loop = Coolio::Loop.new - @paths.each {|path| + handlers = @paths.map {|path| $log.debug "following tail of #{path}" - @loop.attach Handler.new(path, method(:receive_lines)) + pe = @pf ? @pf[path] : NullPositionEntry.instance + h = Handler.new(path, pe, method(:receive_lines)) + @loop.attach h } + handlers.each {|h| + h.on_change # initialize call + } @thread = Thread.new(&method(:run)) end def shutdown + @loop.watchers.each {|w| w.detach } @loop.stop @thread.join + @pf_file.close if @pf_file end def run @loop.run rescue @@ -88,40 +102,53 @@ def parse_line(line) return @parser.parse(line) end - # seek to the end of file first. - # logs never duplicate but may be lost if fluent is down. class Handler < Coolio::StatWatcher - def initialize(path, callback) - @pos = File.stat(path).size + def initialize(path, pe, callback) + stat = File.lstat(path) + @pe = pe + @inode = stat.ino + if @inode == @pe.read_inode + # seek to the saved position + @pos = @pe.read_pos + else + # seek to the end of file first. + # logs never duplicate but may be lost if fluent is down. + @pos = stat.size + @pe.update(@inode, stat.size) + end @buffer = '' @callback = callback super(path) end def on_change lines = [] + inode = nil File.open(path) {|f| - if f.lstat.size < @pos + stat = f.lstat + inode = stat.ino + + if @inode != inode || stat.size < @pos # moved or deleted @pos = 0 else f.seek(@pos) end line = f.gets unless line - return + break end @buffer << line unless line[line.length-1] == ?\n @pos = f.pos - return + break end lines << @buffer @buffer = '' @@ -134,14 +161,111 @@ end @pos = f.pos } + if @inode != inode + @pe.update(inode, @pos) + @inode = inode + else + @pe.update_pos(@pos) + end + @callback.call(lines) rescue Errno::ENOENT # moved or deleted @pos = 0 + # TODO rescue + end + end + + # pos inode + # ffffffffffffffff\tffffffff\n + class PositionEntry + POS_SIZE = 16 + INO_OFFSET = 17 + INO_SIZE = 8 + LN_OFFSET = 25 + SIZE = 26 + + def initialize(file, seek) + @file = file + @seek = seek + end + + def update(ino, pos) + @file.pos = @seek + @file.write "%016x\t%08x" % [pos, ino] + @inode = ino + end + + def update_pos(pos) + @file.pos = @seek + @file.write "%016x" % pos + end + + def read_inode + @file.pos = @seek + INO_OFFSET + @file.read(8).to_i(16) + end + + def read_pos + @file.pos = @seek + @file.read(16).to_i(16) + end + end + + class PositionFile + def initialize(file, map, last_pos) + @file = file + @map = map + @last_pos = last_pos + end + + def [](path) + if m = @map[path] + return m + end + + @file.pos = @last_pos + @file.write path + @file.write "\t" + seek = @file.pos + @file.write "0000000000000000\t00000000\n" + @last_pos = @file.pos + + @map[path] = PositionEntry.new(@file, seek) + end + + def self.parse(file) + map = {} + file.pos = 0 + file.each_line {|line| + m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) + next unless m + path = m[1] + pos = m[2].to_i(16) + ino = m[3].to_i(16) + seek = file.pos - line.bytesize + path.bytesize + 1 + map[path] = PositionEntry.new(file, seek) + } + new(file, map, file.pos) + end + end + + class NullPositionEntry + require 'singleton' + include Singleton + def update(ino, pos) + end + def update_pos(pos) + end + def read_pos + 0 + end + def read_inode + 0 end end end