lib/fluent/plugin/in_tail.rb in fluentd-0.10.49 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.50
- old
+ new
@@ -127,11 +127,11 @@
stop_watchers(unwatched, false, true) unless unwatched.empty?
start_watchers(added) unless added.empty?
end
def setup_watcher(path, pe)
- tw = TailWatcher.new(path, @rotate_wait, pe, log, method(:update_watcher), &method(:receive_lines))
+ tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, method(:update_watcher), &method(:receive_lines))
tw.attach(@loop)
tw
end
def start_watchers(paths)
@@ -156,11 +156,11 @@
paths.each { |path|
tw = @tails.delete(path)
if tw
tw.unwatched = unwatched
if immediate
- close_watcher(tw)
+ close_watcher(tw, false)
else
close_watcher_after_rotate_wait(tw)
end
end
}
@@ -171,12 +171,16 @@
rotated_tw = @tails[path]
@tails[path] = setup_watcher(path, pe)
close_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end
- def close_watcher(tw)
- tw.close
+ # TailWatcher#close is called by another thread at shutdown phase.
+ # It causes 'can't modify string; temporarily locked' error in IOHandler
+ # so adding close_io argument to avoid this problem.
+ # At shutdown, IOHandler's io will be released automatically after detached the event loop
+ def close_watcher(tw, close_io = true)
+ tw.close(close_io)
flush_buffer(tw)
if tw.unwatched && @pf
@pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
end
end
@@ -187,21 +191,22 @@
end
def flush_buffer(tw)
if lb = tw.line_buffer
lb.chomp!
- time, record = parse_line(lb)
- if time && record
- tag = if @tag_prefix || @tag_suffix
- @tag_prefix + tw.tag + @tag_suffix
- else
- @tag
- end
- Engine.emit(tag, time, record)
- else
- log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
- end
+ @parser.parse(lb) { |time, record|
+ if time && record
+ tag = if @tag_prefix || @tag_suffix
+ @tag_prefix + tw.tag + @tag_suffix
+ else
+ @tag
+ end
+ Engine.emit(tag, time, record)
+ else
+ log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
+ end
+ }
end
end
def run
@loop.run
@@ -224,23 +229,20 @@
# ignore errors. Engine shows logs and backtraces.
end
end
end
- def parse_line(line)
- return @parser.parse(line)
- end
-
def convert_line_to_event(line, es)
begin
line.chomp! # remove \n
- time, record = parse_line(line)
- if time && record
- es.add(time, record)
- else
- log.warn "pattern not match: #{line.inspect}"
- end
+ @parser.parse(line) { |time, record|
+ if time && record
+ es.add(time, record)
+ else
+ log.warn "pattern not match: #{line.inspect}"
+ end
+ }
rescue => e
log.warn line.dump, :error => e.to_s
log.debug_backtrace(e.backtrace)
end
end
@@ -254,33 +256,47 @@
end
def parse_multilines(lines, tail_watcher)
lb = tail_watcher.line_buffer
es = MultiEventStream.new
- lines.each { |line|
- if @parser.parser.firstline?(line)
- if lb
- convert_line_to_event(lb, es)
- end
- lb = line
- else
- if lb.nil?
- log.warn "got incomplete line before first line from #{tail_watcher.path}: #{lb.inspect}"
+ if @parser.parser.has_firstline?
+ lines.each { |line|
+ if @parser.parser.firstline?(line)
+ if lb
+ convert_line_to_event(lb, es)
+ end
+ lb = line
else
- lb << line
+ if lb.nil?
+ log.warn "got incomplete line before first line from #{tail_watcher.path}: #{lb.inspect}"
+ else
+ lb << line
+ end
end
+ }
+ else
+ lb ||= ''
+ lines.each do |line|
+ lb << line
+ @parser.parse(lb) { |time, record|
+ if time && record
+ convert_line_to_event(lb, es)
+ lb = ''
+ end
+ }
end
- }
+ end
tail_watcher.line_buffer = lb
es
end
class TailWatcher
- def initialize(path, rotate_wait, pe, log, update_watcher, &receive_lines)
+ def initialize(path, rotate_wait, pe, log, read_from_head, update_watcher, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
+ @read_from_head = read_from_head
@receive_lines = receive_lines
@update_watcher = update_watcher
@timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify))
@stat_trigger = StatWatcher.new(path, log, &method(:on_notify))
@@ -311,12 +327,12 @@
def detach
@timer_trigger.detach if @timer_trigger.attached?
@stat_trigger.detach if @stat_trigger.attached?
end
- def close
- if @io_handler
+ def close(close_io = true)
+ if close_io && @io_handler
@io_handler.on_notify
@io_handler.close
end
detach
end
@@ -352,10 +368,10 @@
else
# this is MemoryPositionEntry or this is the first time fluentd started.
# seek to the end of the any files.
# logs may duplicate without this seek because it's not sure the file is
# existent file or rotated new file.
- pos = fsize
+ pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
io.seek(pos)
@io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines))