lib/fluent/plugin/in_tail.rb in fluentd-0.10.42 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.43
- old
+ new
@@ -60,11 +60,13 @@
end
@loop = Coolio::Loop.new
@tails = @paths.map {|path|
pe = @pf ? @pf[path] : MemoryPositionEntry.new
- TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
+ tw = TailWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
+ tw.log = log
+ tw
}
@tails.each {|tail|
tail.attach(@loop)
}
@thread = Thread.new(&method(:run))
@@ -80,26 +82,28 @@
end
def run
@loop.run
rescue
- $log.error "unexpected error", :error=>$!.to_s
- $log.error_backtrace
+ log.error "unexpected error", :error=>$!.to_s
+ log.error_backtrace
end
def receive_lines(lines)
es = MultiEventStream.new
lines.each {|line|
begin
line.chomp! # remove \n
time, record = parse_line(line)
if time && record
es.add(time, record)
+ else
+ log.warn "pattern not match: #{line.inspect}"
end
rescue
- $log.warn line.dump, :error=>$!.to_s
- $log.debug_backtrace
+ log.warn line.dump, :error=>$!.to_s
+ log.debug_backtrace
end
}
unless es.empty?
begin
@@ -126,12 +130,25 @@
@timer_trigger = TimerWatcher.new(1, true, &method(:on_notify))
@stat_trigger = StatWatcher.new(path, &method(:on_notify))
@rotate_handler = RotateHandler.new(path, &method(:on_rotate))
@io_handler = nil
+ @log = $log
end
+ # We use accessor approach to assign each logger, not passing log object at initialization,
+ # because several plugins depend on these internal classes.
+ # This approach avoids breaking plugins with new log_level option.
+ attr_accessor :log
+
+ def log=(logger)
+ @log = logger
+ @timer_trigger.log = logger
+ @stat_trigger.log = logger
+ @rotate_handler.log = logger
+ end
+
def attach(loop)
@timer_trigger.attach(loop)
@stat_trigger.attach(loop)
on_notify
end
@@ -171,11 +188,11 @@
pos = @pe.read_pos
else
pos = io.pos
end
@pe.update(inode, pos)
- io_handler = IOHandler.new(io, @pe, &@receive_lines)
+ io_handler = IOHandler.new(io, @pe, log, &@receive_lines)
else
io_handler = NullIOHandler.new
end
@io_handler.close
@io_handler = io_handler
@@ -210,60 +227,66 @@
pos = fsize
@pe.update(inode, pos)
end
io.seek(pos)
- @io_handler = IOHandler.new(io, @pe, &@receive_lines)
+ @io_handler = IOHandler.new(io, @pe, log, &@receive_lines)
else
@io_handler = NullIOHandler.new
end
else
if io && @rotate_queue.find {|req| req.io == io }
return
end
last_io = @rotate_queue.empty? ? @io_handler.io : @rotate_queue.last.io
if last_io == nil
- $log.info "detected rotation of #{@path}"
+ log.info "detected rotation of #{@path}"
# rotate imeediately if previous file is nil
wait = 0
else
- $log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds"
+ log.info "detected rotation of #{@path}; waiting #{@rotate_wait} seconds"
wait = @rotate_wait
wait -= @rotate_queue.first.wait unless @rotate_queue.empty?
end
@rotate_queue << RotationRequest.new(io, wait)
end
end
class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, &callback)
@callback = callback
+ @log = $log
super(interval, repeat)
end
+ attr_accessor :log
+
def on_timer
@callback.call
rescue
# TODO log?
- $log.error $!.to_s
- $log.error_backtrace
+ @log.error $!.to_s
+ @log.error_backtrace
end
end
class StatWatcher < Coolio::StatWatcher
def initialize(path, &callback)
@callback = callback
+ @log = $log
super(path)
end
+ attr_accessor :log
+
def on_change(prev, cur)
@callback.call
rescue
# TODO log?
- $log.error $!.to_s
- $log.error_backtrace
+ @log.error $!.to_s
+ @log.error_backtrace
end
end
class RotationRequest
def initialize(io, wait)
@@ -283,12 +306,13 @@
end
MAX_LINES_AT_ONCE = 1000
class IOHandler
- def initialize(io, pe, &receive_lines)
- $log.info "following tail of #{io.path}"
+ def initialize(io, pe, log, &receive_lines)
+ @log = log
+ @log.info "following tail of #{io.path}"
@io = io
@pe = pe
@receive_lines = receive_lines
@buffer = ''.force_encoding('ASCII-8BIT')
@iobuf = ''.force_encoding('ASCII-8BIT')
@@ -326,12 +350,12 @@
end
end while read_more
rescue
- $log.error $!.to_s
- $log.error_backtrace
+ @log.error $!.to_s
+ @log.error_backtrace
close
end
def close
@io.close unless @io.closed?
@@ -356,12 +380,15 @@
def initialize(path, &on_rotate)
@path = path
@inode = nil
@fsize = -1 # first
@on_rotate = on_rotate
+ @log = $log
end
+ attr_accessor :log
+
def on_notify
begin
io = File.open(@path)
stat = io.stat
inode = stat.ino
@@ -384,11 +411,11 @@
ensure
io.close if io
end
rescue
- $log.error $!.to_s
- $log.error_backtrace
+ @log.error $!.to_s
+ @log.error_backtrace
end
end
end