lib/fluent/plugin/in_tail.rb in fluentd-0.10.44 vs lib/fluent/plugin/in_tail.rb in fluentd-0.10.45
- old
+ new
@@ -14,12 +14,672 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
- class TailInput < Input
+ class NewTailInput < Input
Plugin.register_input('tail', self)
+ def initialize
+ super
+ @paths = []
+ @tails = {}
+ end
+
+ config_param :path, :string
+ config_param :tag, :string
+ config_param :rotate_wait, :time, :default => 5
+ config_param :pos_file, :string, :default => nil
+ config_param :read_from_head, :bool, :default => false
+ config_param :refresh_interval, :time, :default => 60
+
+ attr_reader :paths
+
+ def configure(conf)
+ super
+
+ @paths = @path.split(',').map {|path| path.strip }
+ if @paths.empty?
+ raise ConfigError, "tail: 'path' parameter is required on tail input"
+ end
+
+ unless @pos_file
+ $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
+ $log.warn "this parameter is highly recommended to save the position to resume tailing."
+ end
+
+ configure_parser(conf)
+ configure_tag
+
+ @multiline_mode = conf['format'] == 'multiline'
+ @receive_handler = if @multiline_mode
+ method(:parse_multilines)
+ else
+ method(:parse_singleline)
+ end
+ end
+
+ def configure_parser(conf)
+ @parser = TextParser.new
+ @parser.configure(conf)
+ end
+
+ def configure_tag
+ if @tag.index('*')
+ @tag_prefix, @tag_suffix = @tag.split('*')
+ @tag_suffix ||= ''
+ else
+ @tag_prefix = nil
+ @tag_suffix = nil
+ end
+ end
+
+ def start
+ if @pos_file
+ @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
+ @pf_file.sync = true
+ @pf = PositionFile.parse(@pf_file)
+ end
+
+ @loop = Coolio::Loop.new
+ refresh_watchers
+
+ @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers))
+ @refresh_trigger.attach(@loop)
+ @thread = Thread.new(&method(:run))
+ end
+
+ def shutdown
+ @refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached?
+
+ stop_watchers(@tails.keys, true)
+ @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
+ @thread.join
+ @pf_file.close if @pf_file
+ end
+
+ def expand_paths
+ date = Time.now
+ paths = []
+ @paths.each { |path|
+ path = date.strftime(path)
+ if path.include?('*')
+ paths += Dir.glob(path)
+ else
+ # When file is not created yet, Dir.glob returns an empty array. So just add when path is static.
+ paths << path
+ end
+ }
+ paths
+ end
+
+ # in_tail with '*' path doesn't check rotation file equality at refresh phase.
+ # So you should not use '*' path when your logs will be rotated by another tool.
+ # It will cause log duplication after updated watch files.
+ # In such case, you should separate log directory and specify two paths in path parameter.
+ # e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
+ def refresh_watchers
+ target_paths = expand_paths
+ existence_paths = @tails.keys
+
+ unwatched = existence_paths - target_paths
+ added = target_paths - existence_paths
+
+ stop_watchers(unwatched, false, true) unless unwatched.empty?
+ start_watchers(added) unless added.empty?
+ end
+
+ def setup_watcher(path, pe)
+ tw = TailWatcher.new(path, @rotate_wait, pe, log, method(:update_watcher), &method(:receive_lines))
+ tw.attach(@loop)
+ tw
+ end
+
+ def start_watchers(paths)
+ paths.each { |path|
+ pe = nil
+ if @pf
+ pe = @pf[path]
+ if @read_from_head && pe.read_inode.zero?
+ pe.update(File::Stat.new(path).ino, 0)
+ end
+ end
+
+ @tails[path] = setup_watcher(path, pe)
+ }
+ end
+
+ def stop_watchers(paths, immediate = false, unwatched = false)
+ paths.each { |path|
+ tw = @tails.delete(path)
+ if tw
+ tw.unwatched = unwatched
+ if immediate
+ close_watcher(tw)
+ else
+ close_watcher_after_rotate_wait(tw)
+ end
+ end
+ }
+ end
+
+ # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
+ def update_watcher(path, pe)
+ rotated_tw = @tails[path]
+ @tails[path] = setup_watcher(path, pe)
+ close_watcher_after_rotate_wait(rotated_tw) if rotated_tw
+ end
+
+ def close_watcher(tw)
+ tw.close
+ flush_buffer(tw)
+ if tw.unwatched && @pf
+ @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
+ end
+ end
+
+ def close_watcher_after_rotate_wait(tw)
+ closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher))
+ closer.attach(@loop)
+ end
+
+ def flush_buffer(tw)
+ if lb = tw.line_buffer
+ lb.chomp!
+ time, record = parse_line(lb)
+ if time && record
+ tag = if @tag_prefix || @tag_suffix
+ @tag_prefix + tail_watcher.tag + @tag_suffix
+ else
+ @tag
+ end
+ Engine.emit(tag, time, record)
+ else
+ log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
+ end
+ end
+ end
+
+ def run
+ @loop.run
+ rescue
+ log.error "unexpected error", :error=>$!.to_s
+ log.error_backtrace
+ end
+
+ def receive_lines(lines, tail_watcher)
+ es = @receive_handler.call(lines, tail_watcher)
+ unless es.empty?
+ tag = if @tag_prefix || @tag_suffix
+ @tag_prefix + tail_watcher.tag + @tag_suffix
+ else
+ @tag
+ end
+ begin
+ Engine.emit_stream(tag, es)
+ rescue
+ # ignore errors. Engine shows logs and backtraces.
+ end
+ end
+ end
+
+ def parse_line(line)
+ return @parser.parse(line)
+ end
+
+ def convert_line_to_event(line, es)
+ begin
+ line.chomp! # remove \n
+ time, record = parse_line(line)
+ if time && record
+ es.add(time, record)
+ else
+ log.warn "pattern not match: #{line.inspect}"
+ end
+ rescue => e
+ log.warn line.dump, :error => e.to_s
+ log.debug_backtrace(e)
+ end
+ end
+
+ def parse_singleline(lines, tail_watcher)
+ es = MultiEventStream.new
+ lines.each { |line|
+ convert_line_to_event(line, es)
+ }
+ es
+ end
+
+ def parse_multilines(lines, tail_watcher)
+ lb = tail_watcher.line_buffer
+ es = MultiEventStream.new
+ lines.each { |line|
+ if @parser.parser.firstline?(line)
+ if lb
+ convert_line_to_event(lb, es)
+ end
+ lb = line
+ else
+ if lb.nil?
+ log.warn "got incomplete line before first line from #{tail_watcher.path}: #{lb.inspect}"
+ else
+ lb << line
+ end
+ end
+ }
+ tail_watcher.line_buffer = lb
+ es
+ end
+
+ class TailWatcher
+ def initialize(path, rotate_wait, pe, log, update_watcher, &receive_lines)
+ @path = path
+ @rotate_wait = rotate_wait
+ @pe = pe || MemoryPositionEntry.new
+ @receive_lines = receive_lines
+ @update_watcher = update_watcher
+
+ @timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify))
+ @stat_trigger = StatWatcher.new(path, log, &method(:on_notify))
+
+ @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate))
+ @io_handler = nil
+ @log = log
+ end
+
+ attr_reader :path
+ attr_accessor :line_buffer
+ attr_accessor :unwatched # This is used for removing position entry from PositionFile
+
+ def tag
+ @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
+ end
+
+ def wrap_receive_lines(lines)
+ @receive_lines.call(lines, self)
+ end
+
+ def attach(loop)
+ @timer_trigger.attach(loop)
+ @stat_trigger.attach(loop)
+ on_notify
+ end
+
+ def detach
+ @timer_trigger.detach if @timer_trigger.attached?
+ @stat_trigger.detach if @stat_trigger.attached?
+ end
+
+ def close
+ if @io_handler
+ @io_handler.on_notify
+ @io_handler.close
+ end
+ detach
+ end
+
+ def on_notify
+ @rotate_handler.on_notify
+ return unless @io_handler
+ @io_handler.on_notify
+ end
+
+ def on_rotate(io)
+ if @io_handler == nil
+ if io
+ # first time
+ stat = io.stat
+ fsize = stat.size
+ inode = stat.ino
+
+ last_inode = @pe.read_inode
+ if inode == last_inode
+ # rotated file has the same inode number with the last file.
+ # assuming following situation:
+ # a) file was once renamed and backed, or
+ # b) symlink or hardlink to the same file is recreated
+ # in either case, seek to the saved position
+ pos = @pe.read_pos
+ elsif last_inode != 0
+ # this is FilePositionEntry and fluentd once started.
+ # read data from the head of the rotated file.
+ # logs never duplicate because this file is a rotated new file.
+ pos = 0
+ @pe.update(inode, pos)
+ else
+ # this is MemoryPositionEntry or this is the first time fluentd started.
+ # seek to the end of the any files.
+ # logs may duplicate without this seek because it's not sure the file is
+ # existent file or rotated new file.
+ pos = fsize
+ @pe.update(inode, pos)
+ end
+ io.seek(pos)
+
+ @io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines))
+ else
+ @io_handler = NullIOHandler.new
+ end
+ else
+ log_msg = "detected rotation of #{@path}"
+ log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist
+ @log.info log_msg
+
+ if io
+ stat = io.stat
+ inode = stat.ino
+ if inode == @pe.read_inode # truncated
+ @pe.update_pos(stat.size)
+ io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines))
+ @io_handler.close
+ @io_handler = io_handler
+ elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
+ @pe.update(inode, io.pos)
+ io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines))
+ @io_handler = io_handler
+ else
+ @update_watcher.call(@path, swap_state(@pe))
+ end
+ else
+ @io_handler.close
+ @io_handler = NullIOHandler.new
+ end
+ end
+
+ def swap_state(pe)
+ # Use MemoryPositionEntry for rotated file temporary
+ mpe = MemoryPositionEntry.new
+ mpe.update(pe.read_inode, pe.read_pos)
+ @pe = mpe
+ @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.
+
+ pe # This pe will be updated in on_rotate after TailWatcher is initialized
+ end
+ end
+
+ class TimerWatcher < Coolio::TimerWatcher
+ def initialize(interval, repeat, log, &callback)
+ @callback = callback
+ @log = log
+ super(interval, repeat)
+ end
+
+ def on_timer
+ @callback.call
+ rescue
+ # TODO log?
+ @log.error $!.to_s
+ @log.error_backtrace
+ end
+ end
+
+ class StatWatcher < Coolio::StatWatcher
+ def initialize(path, log, &callback)
+ @callback = callback
+ @log = log
+ super(path)
+ end
+
+ def on_change(prev, cur)
+ @callback.call
+ rescue
+ # TODO log?
+ @log.error $!.to_s
+ @log.error_backtrace
+ end
+ end
+
+ class Closer < Coolio::TimerWatcher
+ def initialize(interval, tw, log, &callback)
+ @callback = callback
+ @tw = tw
+ @log = log
+ super(interval, false)
+ end
+
+ def on_timer
+ @callback.call(@tw)
+ rescue => e
+ @log.error e.to_s
+ @log.error_backtrace(e.backtrace)
+ end
+ end
+
+ MAX_LINES_AT_ONCE = 1000
+
+ class IOHandler
+ def initialize(io, pe, log, first = true, &receive_lines)
+ @log = log
+ @log.info "following tail of #{io.path}" if first
+ @io = io
+ @pe = pe
+ @receive_lines = receive_lines
+ @buffer = ''.force_encoding('ASCII-8BIT')
+ @iobuf = ''.force_encoding('ASCII-8BIT')
+ end
+
+ attr_reader :io
+ attr_accessor :pe
+
+ def on_notify
+ begin
+ lines = []
+ read_more = false
+
+ begin
+ while true
+ if @buffer.empty?
+ @io.read_nonblock(2048, @buffer)
+ else
+ @buffer << @io.read_nonblock(2048, @iobuf)
+ end
+ while line = @buffer.slice!(/.*?\n/m)
+ lines << line
+ end
+ if lines.size >= MAX_LINES_AT_ONCE
+ # not to use too much memory in case the file is very large
+ read_more = true
+ break
+ end
+ end
+ rescue EOFError
+ end
+
+ unless lines.empty?
+ @receive_lines.call(lines)
+ @pe.update_pos(@io.pos - @buffer.bytesize)
+ end
+ end while read_more
+
+ rescue
+ @log.error $!.to_s
+ @log.error_backtrace
+ close
+ end
+
+ def close
+ @io.close unless @io.closed?
+ end
+ end
+
+ class NullIOHandler
+ def initialize
+ end
+
+ def io
+ end
+
+ def on_notify
+ end
+
+ def close
+ end
+ end
+
+ class RotateHandler
+ def initialize(path, log, &on_rotate)
+ @path = path
+ @inode = nil
+ @fsize = -1 # first
+ @on_rotate = on_rotate
+ @log = log
+ end
+
+ def on_notify
+ begin
+ io = File.open(@path)
+ stat = io.stat
+ inode = stat.ino
+ fsize = stat.size
+ rescue Errno::ENOENT
+ # moved or deleted
+ inode = nil
+ fsize = 0
+ end
+
+ begin
+ if @inode != inode || fsize < @fsize
+ # rotated or truncated
+ @on_rotate.call(io)
+ io = nil
+ end
+
+ @inode = inode
+ @fsize = fsize
+ ensure
+ io.close if io
+ end
+
+ rescue
+ @log.error $!.to_s
+ @log.error_backtrace
+ end
+ end
+ end
+
+
+ class PositionFile
+ UNWATCHED_POSITION = 0xffffffffffffffff
+
+ def initialize(file, map, last_pos)
+ @file = file
+ @map = map
+ @last_pos = last_pos
+ end
+
+ def [](path)
+ if m = @map[path]
+ return m
+ end
+
+ @file.pos = @last_pos
+ @file.write path
+ @file.write "\t"
+ seek = @file.pos
+ @file.write "0000000000000000\t00000000\n"
+ @last_pos = @file.pos
+
+ @map[path] = FilePositionEntry.new(@file, seek)
+ end
+
+ def self.parse(file)
+ compact(file)
+
+ map = {}
+ file.pos = 0
+ file.each_line {|line|
+ m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
+ next unless m
+ path = m[1]
+ pos = m[2].to_i(16)
+ ino = m[3].to_i(16)
+ seek = file.pos - line.bytesize + path.bytesize + 1
+ map[path] = FilePositionEntry.new(file, seek)
+ }
+ new(file, map, file.pos)
+ end
+
+ # Clean up unwatched file entries
+ def self.compact(file)
+ file.pos = 0
+ existent_entries = file.each_line.select { |line|
+ m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
+ next unless m
+ pos = m[2].to_i(16)
+ pos == UNWATCHED_POSITION ? nil : line
+ }
+
+ file.pos = 0
+ file.truncate(0)
+ file.write(existent_entries.join)
+ end
+ end
+
+ # pos inode
+ # ffffffffffffffff\tffffffff\n
+ class FilePositionEntry
+ POS_SIZE = 16
+ INO_OFFSET = 17
+ INO_SIZE = 8
+ LN_OFFSET = 25
+ SIZE = 26
+
+ def initialize(file, seek)
+ @file = file
+ @seek = seek
+ end
+
+ def update(ino, pos)
+ @file.pos = @seek
+ @file.write "%016x\t%08x" % [pos, ino]
+ end
+
+ def update_pos(pos)
+ @file.pos = @seek
+ @file.write "%016x" % pos
+ end
+
+ def read_inode
+ @file.pos = @seek + INO_OFFSET
+ raw = @file.read(8)
+ raw ? raw.to_i(16) : 0
+ end
+
+ def read_pos
+ @file.pos = @seek
+ raw = @file.read(16)
+ raw ? raw.to_i(16) : 0
+ end
+ end
+
+ class MemoryPositionEntry
+ def initialize
+ @pos = 0
+ @inode = 0
+ end
+
+ def update(ino, pos)
+ @inode = ino
+ @pos = pos
+ end
+
+ def update_pos(pos)
+ @pos = pos
+ end
+
+ def read_pos
+ @pos
+ end
+
+ def read_inode
+ @inode
+ end
+ end
+ end
+
+ # This TailInput is for existence plugins which extends old in_tail
+ # This class will be removed after release v1.
+ class TailInput < Input
def initialize
super
@paths = []
end