# # Fluent # # Copyright (C) 2011 FURUHASHI Sadayuki # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent class TailInput < Input Plugin.register_input('tail', self) def initialize super @paths = [] end config_param :path, :string config_param :tag, :string config_param :rotate_wait, :time, :default => 5 config_param :pos_file, :string, :default => nil attr_reader :paths def configure(conf) super @paths = @path.split(',').map {|path| path.strip } if @paths.empty? raise ConfigError, "tail: 'path' parameter is required on tail input" end if @pos_file @pf_file = File.open(@pos_file, File::RDWR|File::CREAT) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) else $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." end configure_parser(conf) end def configure_parser(conf) @parser = TextParser.new @parser.configure(conf) end def start @loop = Coolio::Loop.new @tails = @paths.map {|path| pe = @pf ? @pf[path] : MemoryPositionEntry.new TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines)) } @tails.each {|tail| tail.attach(@loop) } @thread = Thread.new(&method(:run)) end def shutdown @tails.each {|tail| tail.close } @loop.stop @thread.join @pf_file.close if @pf_file end def run @loop.run rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace end def receive_lines(lines) es = MultiEventStream.new lines.each {|line| begin line.chomp! # remove \n time, record = parse_line(line) if time && record es.add(time, record) end rescue $log.warn line.dump, :error=>$!.to_s $log.debug_backtrace end } unless es.empty? begin Engine.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. end end end def parse_line(line) return @parser.parse(line) end class TailWatcher def initialize(path, rotate_wait, pe, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @receive_lines = receive_lines @rotate_queue = [] @timer_trigger = TimerWatcher.new(1, true, &method(:on_notify)) @stat_trigger = StatWatcher.new(path, &method(:on_notify)) @rotate_handler = RotateHandler.new(path, &method(:on_rotate)) @io_handler = nil end def attach(loop) @timer_trigger.attach(loop) @stat_trigger.attach(loop) on_notify end def detach @timer_trigger.detach if @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end def close @rotate_queue.reject! {|req| req.io.close true } detach end def on_notify @rotate_handler.on_notify return unless @io_handler @io_handler.on_notify # proceeds rotate queue return if @rotate_queue.empty? @rotate_queue.first.tick while @rotate_queue.first.ready? if io = @rotate_queue.first.io stat = io.stat inode = stat.ino if inode == @pe.read_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos else pos = io.pos end @pe.update(inode, pos) io_handler = IOHandler.new(io, @pe, &@receive_lines) else io_handler = NullIOHandler.new end @io_handler.close @io_handler = io_handler @rotate_queue.shift break if @rotate_queue.empty? end end def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, &@receive_lines) else @io_handler = NullIOHandler.new end else if io && @rotate_queue.find {|req| req.io == io } return end last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io if last_io == nil $log.info "detected rotation of #{@path}" # rotate imeediately if previous file is nil wait = 0 else $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds" wait = @rotate_wait wait -= @rotate_queue.first.wait unless @rotate_queue.empty? end @rotate_queue << RotationRequest.new(io, wait) end end class TimerWatcher < Coolio::TimerWatcher def initialize(interval, repeat, &callback) @callback = callback super(interval, repeat) end def on_timer @callback.call rescue # TODO log? $log.error $!.to_s $log.error_backtrace end end class StatWatcher < Coolio::StatWatcher def initialize(path, &callback) @callback = callback super(path) end def on_change(prev, cur) @callback.call rescue # TODO log? $log.error $!.to_s $log.error_backtrace end end class RotationRequest def initialize(io, wait) @io = io @wait = wait end attr_reader :io def tick @wait -= 1 end def ready? @wait <= 0 end end MAX_LINES_AT_ONCE = 1000 class IOHandler def initialize(io, pe, &receive_lines) $log.info "following tail of #{io.path}" @io = io @pe = pe @receive_lines = receive_lines @buffer = ''.force_encoding('ASCII-8BIT') @iobuf = ''.force_encoding('ASCII-8BIT') end attr_reader :io def on_notify begin lines = [] read_more = false begin while true if @buffer.empty? @io.read_nonblock(2048, @buffer) else @buffer << @io.read_nonblock(2048, @iobuf) end while line = @buffer.slice!(/.*?\n/m) lines << line end if lines.size >= MAX_LINES_AT_ONCE # not to use too much memory in case the file is very large read_more = true break end end rescue EOFError end unless lines.empty? @receive_lines.call(lines) @pe.update_pos(@io.pos - @buffer.bytesize) end end while read_more rescue $log.error $!.to_s $log.error_backtrace close end def close @io.close unless @io.closed? end end class NullIOHandler def initialize end def io end def on_notify end def close end end class RotateHandler def initialize(path, &on_rotate) @path = path @inode = nil @fsize = -1 # first @on_rotate = on_rotate @path = path end def on_notify begin io = File.open(@path) stat = io.stat inode = stat.ino fsize = stat.size rescue Errno::ENOENT # moved or deleted inode = nil fsize = 0 end begin if @inode != inode || fsize < @fsize # rotated or truncated @on_rotate.call(io) io = nil end @inode = inode @fsize = fsize ensure io.close if io end rescue $log.error $!.to_s $log.error_backtrace end end end class PositionFile def initialize(file, map, last_pos) @file = file @map = map @last_pos = last_pos end def [](path) if m = @map[path] return m end @file.pos = @last_pos @file.write path @file.write "\t" seek = @file.pos @file.write "0000000000000000\t00000000\n" @last_pos = @file.pos @map[path] = FilePositionEntry.new(@file, seek) end def self.parse(file) map = {} file.pos = 0 file.each_line {|line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) next unless m path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) seek = file.pos - line.bytesize + path.bytesize + 1 map[path] = FilePositionEntry.new(file, seek) } new(file, map, file.pos) end end # pos inode # ffffffffffffffff\tffffffff\n class FilePositionEntry POS_SIZE = 16 INO_OFFSET = 17 INO_SIZE = 8 LN_OFFSET = 25 SIZE = 26 def initialize(file, seek) @file = file @seek = seek end def update(ino, pos) @file.pos = @seek @file.write "%016x\t%08x" % [pos, ino] @inode = ino end def update_pos(pos) @file.pos = @seek @file.write "%016x" % pos end def read_inode @file.pos = @seek + INO_OFFSET raw = @file.read(8) raw ? raw.to_i(16) : 0 end def read_pos @file.pos = @seek raw = @file.read(16) raw ? raw.to_i(16) : 0 end end class MemoryPositionEntry def initialize @pos = 0 @inode = 0 end def update(ino, pos) @inode = ino @pos = pos end def update_pos(pos) @pos = pos end def read_pos @pos end def read_inode @inode end end end end