lib/fluent/plugin/in_tail.rb in fluentd-0.10.42 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.43

- old
+ new

@@ -60,11 +60,13 @@ end @loop = Coolio::Loop.new @tails = @paths.map {|path| pe = @pf ? @pf[path] : MemoryPositionEntry.new - TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines)) + tw = TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines)) + tw.log = log + tw } @tails.each {|tail| tail.attach(@loop) } @thread = Thread.new(&method(:run)) @@ -80,26 +82,28 @@ end def run @loop.run rescue - $log.error "unexpected error", :error=>$!.to_s - $log.error_backtrace + log.error "unexpected error", :error=>$!.to_s + log.error_backtrace end def receive_lines(lines) es = MultiEventStream.new lines.each {|line| begin line.chomp! # remove \n time, record = parse_line(line) if time && record es.add(time, record) + else + log.warn "pattern not match: #{line.inspect}" end rescue - $log.warn line.dump, :error=>$!.to_s - $log.debug_backtrace + log.warn line.dump, :error=>$!.to_s + log.debug_backtrace end } unless es.empty? begin @@ -126,12 +130,25 @@ @timer_trigger = TimerWatcher.new(1, true, &method(:on_notify)) @stat_trigger = StatWatcher.new(path, &method(:on_notify)) @rotate_handler = RotateHandler.new(path, &method(:on_rotate)) @io_handler = nil + @log = $log end + # We use accessor approach to assign each logger, not passing log object at initialization, + # because several plugins depend on these internal classes. + # This approach avoids breaking plugins with new log_level option. + attr_accessor :log + + def log=(logger) + @log = logger + @timer_trigger.log = logger + @stat_trigger.log = logger + @rotate_handler.log = logger + end + def attach(loop) @timer_trigger.attach(loop) @stat_trigger.attach(loop) on_notify end @@ -171,11 +188,11 @@ pos = @pe.read_pos else pos = io.pos end @pe.update(inode, pos) - io_handler = IOHandler.new(io, @pe, &@receive_lines) + io_handler = IOHandler.new(io, @pe, log, &@receive_lines) else io_handler = NullIOHandler.new end @io_handler.close @io_handler = io_handler @@ -210,60 +227,66 @@ pos = fsize @pe.update(inode, pos) end io.seek(pos) - @io_handler = IOHandler.new(io, @pe, &@receive_lines) + @io_handler = IOHandler.new(io, @pe, log, &@receive_lines) else @io_handler = NullIOHandler.new end else if io && @rotate_queue.find {|req| req.io == io } return end last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io if last_io == nil - $log.info "detected rotation of #{@path}" + log.info "detected rotation of #{@path}" # rotate imeediately if previous file is nil wait = 0 else - $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds" + log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds" wait = @rotate_wait wait -= @rotate_queue.first.wait unless @rotate_queue.empty? end @rotate_queue << RotationRequest.new(io, wait) end end class TimerWatcher < Coolio::TimerWatcher def initialize(interval, repeat, &callback) @callback = callback + @log = $log super(interval, repeat) end + attr_accessor :log + def on_timer @callback.call rescue # TODO log? - $log.error $!.to_s - $log.error_backtrace + @log.error $!.to_s + @log.error_backtrace end end class StatWatcher < Coolio::StatWatcher def initialize(path, &callback) @callback = callback + @log = $log super(path) end + attr_accessor :log + def on_change(prev, cur) @callback.call rescue # TODO log? - $log.error $!.to_s - $log.error_backtrace + @log.error $!.to_s + @log.error_backtrace end end class RotationRequest def initialize(io, wait) @@ -283,12 +306,13 @@ end MAX_LINES_AT_ONCE = 1000 class IOHandler - def initialize(io, pe, &receive_lines) - $log.info "following tail of #{io.path}" + def initialize(io, pe, log, &receive_lines) + @log = log + @log.info "following tail of #{io.path}" @io = io @pe = pe @receive_lines = receive_lines @buffer = ''.force_encoding('ASCII-8BIT') @iobuf = ''.force_encoding('ASCII-8BIT') @@ -326,12 +350,12 @@ end end while read_more rescue - $log.error $!.to_s - $log.error_backtrace + @log.error $!.to_s + @log.error_backtrace close end def close @io.close unless @io.closed? @@ -356,12 +380,15 @@ def initialize(path, &on_rotate) @path = path @inode = nil @fsize = -1 # first @on_rotate = on_rotate + @log = $log end + attr_accessor :log + def on_notify begin io = File.open(@path) stat = io.stat inode = stat.ino @@ -384,11 +411,11 @@ ensure io.close if io end rescue - $log.error $!.to_s - $log.error_backtrace + @log.error $!.to_s + @log.error_backtrace end end end