# Fluentd input plugin for general hardware monitoring # Copyright (C) 2016 Nexedi SA and Contributors. # Klaus Wölfel # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'fluent/input' module Fluent class BinInput < NewTailInput Plugin.register_input('bin', self) def convert_line_to_event(line, es, tail_watcher) begin es.add(nil, line) rescue => e log.warn line.dump, error: e.to_s log.debug_backtrace(e.backtrace) end end def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines)) tw.attach(@loop) tw end class TailWatcher < NewTailInput::TailWatcher def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist @log.info log_msg if io stat = io.stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(stat.size) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler.close @io_handler = io_handler elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher @pe.update(inode, io.pos) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler = io_handler else # file is rotated and new file found @update_watcher.call(@path, swap_state(@pe)) end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil @update_watcher.call(@path, swap_state(@pe)) end end def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end end class IOHandler < NewTailInput::TailWatcher::IOHandler def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) @log = log @log.info "following #{io.path}" if first @io = io @pe = pe @read_lines_limit = read_lines_limit @receive_lines = receive_lines @lines = [] @io.binmode end def on_notify begin read_more = false if @lines.empty? begin while true @lines << @io.readpartial(2048) if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true break end end rescue EOFError end end unless @lines.empty? if @receive_lines.call(@lines) @pe.update_pos(@io.pos) @lines.clear else read_more = false end end end while read_more rescue @log.error $!.to_s @log.error_backtrace close end end end end end