lib/fluent/plugin/in_tail.rb in fluentd-1.2.4 vs lib/fluent/plugin/in_tail.rb in fluentd-1.2.5.rc1
- old
+ new
@@ -264,17 +264,20 @@
def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
- watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
- event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher
+ event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
+ event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
end
tw
rescue => e
if tw
- tw.detach
+ tw.detach { |watcher|
+ event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
+ event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
+ }
tw.close
end
raise e
end
@@ -341,19 +344,24 @@
# TailWatcher#close is called by another thread at shutdown phase.
# It causes 'can't modify string; temporarily locked' error in IOHandler
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
- tw.detach
+ tw.detach { |watcher|
+ event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
+ event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
+ }
tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
@pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
end
end
def detach_watcher_after_rotate_wait(tw)
+ # Call event_loop_attach/event_loop_detach is high-cost for short-live object.
+ # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
end
end
@@ -477,11 +485,11 @@
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@update_watcher = update_watcher
@stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil
- @timer_trigger = nil
+ @timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil
@rotate_handler = RotateHandler.new(self, &method(:on_rotate))
@io_handler = nil
@log = log
@@ -511,12 +519,11 @@
on_notify
yield self
end
def detach
- @timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached?
- @stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached?
+ yield self
@io_handler.on_notify if @io_handler
end
def close
if @io_handler
@@ -611,10 +618,25 @@
mpe.update(pe.read_inode, pe.read_pos)
@pe = mpe
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
+ class TimerTrigger < Coolio::TimerWatcher
+ def initialize(interval, log, &callback)
+ @callback = callback
+ @log = log
+ super(interval, true)
+ end
+
+ def on_timer
+ @callback.call
+ rescue => e
+ @log.error e.to_s
+ @log.error_backtrace
+ end
+ end
+
class StatWatcher < Coolio::StatWatcher
def initialize(watcher, &callback)
@watcher = watcher
@callback = callback
super(watcher.path)
@@ -626,10 +648,9 @@
# TODO log?
@watcher.log.error $!.to_s
@watcher.log.error_backtrace
end
end
-
class FIFO
def initialize(from_encoding, encoding)
@from_encoding = from_encoding
@encoding = encoding