lib/fluent/plugin/in_tail.rb in fluentd-0.10.6 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.7
- old
+ new
@@ -26,39 +26,53 @@
@paths = []
end
config_param :path, :string
config_param :tag, :string
+ config_param :pos_file, :string, :default => nil
def configure(conf)
super
@paths = @path.split(',').map {|path| path.strip }
if @paths.empty?
raise ConfigError, "tail: 'path' parameter is required on tail input"
end
+ if @pos_file
+ @pf_file = File.open(@pos_file, File::RDWR|File::CREAT)
+ @pf_file.sync = true
+ @pf = PositionFile.parse(@pf_file)
+ end
+
configure_parser(conf)
end
def configure_parser(conf)
@parser = TextParser.new
@parser.configure(conf)
end
def start
@loop = Coolio::Loop.new
- @paths.each {|path|
+ handlers = @paths.map {|path|
$log.debug "following tail of #{path}"
- @loop.attach Handler.new(path, method(:receive_lines))
+ pe = @pf ? @pf[path] : NullPositionEntry.instance
+ h = Handler.new(path, pe, method(:receive_lines))
+ @loop.attach h
}
+ handlers.each {|h|
+ h.on_change # initialize call
+ }
@thread = Thread.new(&method(:run))
end
def shutdown
+ @loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join
+ @pf_file.close if @pf_file
end
def run
@loop.run
rescue
@@ -88,40 +102,53 @@
def parse_line(line)
return @parser.parse(line)
end
- # seek to the end of file first.
- # logs never duplicate but may be lost if fluent is down.
class Handler < Coolio::StatWatcher
- def initialize(path, callback)
- @pos = File.stat(path).size
+ def initialize(path, pe, callback)
+ stat = File.lstat(path)
+ @pe = pe
+ @inode = stat.ino
+ if @inode == @pe.read_inode
+ # seek to the saved position
+ @pos = @pe.read_pos
+ else
+ # seek to the end of file first.
+ # logs never duplicate but may be lost if fluent is down.
+ @pos = stat.size
+ @pe.update(@inode, stat.size)
+ end
@buffer = ''
@callback = callback
super(path)
end
def on_change
lines = []
+ inode = nil
File.open(path) {|f|
- if f.lstat.size < @pos
+ stat = f.lstat
+ inode = stat.ino
+
+ if @inode != inode || stat.size < @pos
# moved or deleted
@pos = 0
else
f.seek(@pos)
end
line = f.gets
unless line
- return
+ break
end
@buffer << line
unless line[line.length-1] == ?\n
@pos = f.pos
- return
+ break
end
lines << @buffer
@buffer = ''
@@ -134,14 +161,111 @@
end
@pos = f.pos
}
+ if @inode != inode
+ @pe.update(inode, @pos)
+ @inode = inode
+ else
+ @pe.update_pos(@pos)
+ end
+
@callback.call(lines)
rescue Errno::ENOENT
# moved or deleted
@pos = 0
+ # TODO rescue
+ end
+ end
+
+ # pos inode
+ # ffffffffffffffff\tffffffff\n
+ class PositionEntry
+ POS_SIZE = 16
+ INO_OFFSET = 17
+ INO_SIZE = 8
+ LN_OFFSET = 25
+ SIZE = 26
+
+ def initialize(file, seek)
+ @file = file
+ @seek = seek
+ end
+
+ def update(ino, pos)
+ @file.pos = @seek
+ @file.write "%016x\t%08x" % [pos, ino]
+ @inode = ino
+ end
+
+ def update_pos(pos)
+ @file.pos = @seek
+ @file.write "%016x" % pos
+ end
+
+ def read_inode
+ @file.pos = @seek + INO_OFFSET
+ @file.read(8).to_i(16)
+ end
+
+ def read_pos
+ @file.pos = @seek
+ @file.read(16).to_i(16)
+ end
+ end
+
+ class PositionFile
+ def initialize(file, map, last_pos)
+ @file = file
+ @map = map
+ @last_pos = last_pos
+ end
+
+ def [](path)
+ if m = @map[path]
+ return m
+ end
+
+ @file.pos = @last_pos
+ @file.write path
+ @file.write "\t"
+ seek = @file.pos
+ @file.write "0000000000000000\t00000000\n"
+ @last_pos = @file.pos
+
+ @map[path] = PositionEntry.new(@file, seek)
+ end
+
+ def self.parse(file)
+ map = {}
+ file.pos = 0
+ file.each_line {|line|
+ m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
+ next unless m
+ path = m[1]
+ pos = m[2].to_i(16)
+ ino = m[3].to_i(16)
+ seek = file.pos - line.bytesize + path.bytesize + 1
+ map[path] = PositionEntry.new(file, seek)
+ }
+ new(file, map, file.pos)
+ end
+ end
+
+ class NullPositionEntry
+ require 'singleton'
+ include Singleton
+ def update(ino, pos)
+ end
+ def update_pos(pos)
+ end
+ def read_pos
+ 0
+ end
+ def read_inode
+ 0
end
end
end