lib/fluent/plugin/in_tail.rb in fluentd-1.2.4 vs lib/fluent/plugin/in_tail.rb in fluentd-1.2.5.rc1

- old
+ new

@@ -264,17 +264,20 @@ def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) tw.attach do |watcher| - watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer - event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher + event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger + event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger end tw rescue => e if tw - tw.detach + tw.detach { |watcher| + event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger + event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger + } tw.close end raise e end @@ -341,19 +344,24 @@ # TailWatcher#close is called by another thread at shutdown phase. # It causes 'can't modify string; temporarily locked' error in IOHandler # so adding close_io argument to avoid this problem. # At shutdown, IOHandler's io will be released automatically after detached the event loop def detach_watcher(tw, close_io = true) - tw.detach + tw.detach { |watcher| + event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger + event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger + } tw.close if close_io flush_buffer(tw) if tw.unwatched && @pf @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION) end end def detach_watcher_after_rotate_wait(tw) + # Call event_loop_attach/event_loop_detach is high-cost for short-live object. + # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw) end end @@ -477,11 +485,11 @@ @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil - @timer_trigger = nil + @timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil @rotate_handler = RotateHandler.new(self, &method(:on_rotate)) @io_handler = nil @log = log @@ -511,12 +519,11 @@ on_notify yield self end def detach - @timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached? - @stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached? + yield self @io_handler.on_notify if @io_handler end def close if @io_handler @@ -611,10 +618,25 @@ mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end + class TimerTrigger < Coolio::TimerWatcher + def initialize(interval, log, &callback) + @callback = callback + @log = log + super(interval, true) + end + + def on_timer + @callback.call + rescue => e + @log.error e.to_s + @log.error_backtrace + end + end + class StatWatcher < Coolio::StatWatcher def initialize(watcher, &callback) @watcher = watcher @callback = callback super(watcher.path) @@ -626,10 +648,9 @@ # TODO log? @watcher.log.error $!.to_s @watcher.log.error_backtrace end end - class FIFO def initialize(from_encoding, encoding) @from_encoding = from_encoding @encoding = encoding