lib/fluent/plugin/in_tail.rb in fluentd-0.10.49 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.50

- old
+ new

@@ -127,11 +127,11 @@ stop_watchers(unwatched, false, true) unless unwatched.empty? start_watchers(added) unless added.empty? end def setup_watcher(path, pe) - tw = TailWatcher.new(path, @rotate_wait, pe, log, method(:update_watcher), &method(:receive_lines)) + tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, method(:update_watcher), &method(:receive_lines)) tw.attach(@loop) tw end def start_watchers(paths) @@ -156,11 +156,11 @@ paths.each { |path| tw = @tails.delete(path) if tw tw.unwatched = unwatched if immediate - close_watcher(tw) + close_watcher(tw, false) else close_watcher_after_rotate_wait(tw) end end } @@ -171,12 +171,16 @@ rotated_tw = @tails[path] @tails[path] = setup_watcher(path, pe) close_watcher_after_rotate_wait(rotated_tw) if rotated_tw end - def close_watcher(tw) - tw.close + # TailWatcher#close is called by another thread at shutdown phase. + # It causes 'can't modify string; temporarily locked' error in IOHandler + # so adding close_io argument to avoid this problem. + # At shutdown, IOHandler's io will be released automatically after detached the event loop + def close_watcher(tw, close_io = true) + tw.close(close_io) flush_buffer(tw) if tw.unwatched && @pf @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION) end end @@ -187,21 +191,22 @@ end def flush_buffer(tw) if lb = tw.line_buffer lb.chomp! - time, record = parse_line(lb) - if time && record - tag = if @tag_prefix || @tag_suffix - @tag_prefix + tw.tag + @tag_suffix - else - @tag - end - Engine.emit(tag, time, record) - else - log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}" - end + @parser.parse(lb) { |time, record| + if time && record + tag = if @tag_prefix || @tag_suffix + @tag_prefix + tw.tag + @tag_suffix + else + @tag + end + Engine.emit(tag, time, record) + else + log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}" + end + } end end def run @loop.run @@ -224,23 +229,20 @@ # ignore errors. Engine shows logs and backtraces. end end end - def parse_line(line) - return @parser.parse(line) - end - def convert_line_to_event(line, es) begin line.chomp! # remove \n - time, record = parse_line(line) - if time && record - es.add(time, record) - else - log.warn "pattern not match: #{line.inspect}" - end + @parser.parse(line) { |time, record| + if time && record + es.add(time, record) + else + log.warn "pattern not match: #{line.inspect}" + end + } rescue => e log.warn line.dump, :error => e.to_s log.debug_backtrace(e.backtrace) end end @@ -254,33 +256,47 @@ end def parse_multilines(lines, tail_watcher) lb = tail_watcher.line_buffer es = MultiEventStream.new - lines.each { |line| - if @parser.parser.firstline?(line) - if lb - convert_line_to_event(lb, es) - end - lb = line - else - if lb.nil? - log.warn "got incomplete line before first line from #{tail_watcher.path}: #{lb.inspect}" + if @parser.parser.has_firstline? + lines.each { |line| + if @parser.parser.firstline?(line) + if lb + convert_line_to_event(lb, es) + end + lb = line else - lb << line + if lb.nil? + log.warn "got incomplete line before first line from #{tail_watcher.path}: #{lb.inspect}" + else + lb << line + end end + } + else + lb ||= '' + lines.each do |line| + lb << line + @parser.parse(lb) { |time, record| + if time && record + convert_line_to_event(lb, es) + lb = '' + end + } end - } + end tail_watcher.line_buffer = lb es end class TailWatcher - def initialize(path, rotate_wait, pe, log, update_watcher, &receive_lines) + def initialize(path, rotate_wait, pe, log, read_from_head, update_watcher, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new + @read_from_head = read_from_head @receive_lines = receive_lines @update_watcher = update_watcher @timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify)) @stat_trigger = StatWatcher.new(path, log, &method(:on_notify)) @@ -311,12 +327,12 @@ def detach @timer_trigger.detach if @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end - def close - if @io_handler + def close(close_io = true) + if close_io && @io_handler @io_handler.on_notify @io_handler.close end detach end @@ -352,10 +368,10 @@ else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. - pos = fsize + pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines))