lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.1 vs lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.2

- old
+ new

@@ -17,13 +17,13 @@ config_param :error_file_suffix, :string, :default => '.error' config_param :line_terminated_by, :string, :default => "\n" config_param :oneline_max_bytes, :integer, :default => 536870912 # 512MB config_param :move_to, :string, :default => '/tmp' config_param :remove_after_processing, :bool, :default => false - config_param :run_interval, :integer, :default => 5 + config_param :run_interval, :time, :default => 5 + config_param :file_event_stream, :bool, :default => false - # To support log_level option implemented by Fluentd v0.10.43 unless method_defined?(:log) define_method("log") { $log } end @@ -182,25 +182,55 @@ end yield(buffer.chomp!(@line_terminated_by)) buffer_clean! end - def emit_message(message) - if message - @parser.parse(message) do |time, record| - unless time and record - raise FormatError, - "in_cat_sweep: pattern not match: #{message.inspect}" - end + def emit_line(line) + if line + time, record = parse_line(line) + if time and record router.emit(@tag, time, record) end end end + def emit_file(fp) + entries = [] + read_each_line(fp) do |line| + if line + entry = parse_line(line) + entries << entry if entry + end + end + unless entries.empty? + es = ArrayEventStream.new(entries) + router.emit_stream(@tag, es) + end + end + + def parse_line(line) + entry = nil + @parser.parse(line) do |time, record| + if time && record + entry = [time, record] + else + # We want to fail an entire file on `pattern not match` + # This behavior makes it easy to recover with manual fix operation + raise FormatError, + "in_cat_sweep: pattern not match: #{line.inspect}" + end + end + entry + end + def process(original_filename, processing_filename) File.open(processing_filename, 'r') do |tfile| - read_each_line(tfile) do |line| - emit_message(line) + if @file_event_stream + emit_file(tfile) + else + read_each_line(tfile) do |line| + emit_line(line) + end end log.debug { %[in_cat_sweep: process: {filename:"#{original_filename}",size:#{tfile.size}}] } end end