lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.1 vs lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.2
- old
+ new
@@ -17,13 +17,13 @@
config_param :error_file_suffix, :string, :default => '.error'
config_param :line_terminated_by, :string, :default => "\n"
config_param :oneline_max_bytes, :integer, :default => 536870912 # 512MB
config_param :move_to, :string, :default => '/tmp'
config_param :remove_after_processing, :bool, :default => false
- config_param :run_interval, :integer, :default => 5
+ config_param :run_interval, :time, :default => 5
+ config_param :file_event_stream, :bool, :default => false
-
# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
define_method("log") { $log }
end
@@ -182,25 +182,55 @@
end
yield(buffer.chomp!(@line_terminated_by))
buffer_clean!
end
- def emit_message(message)
- if message
- @parser.parse(message) do |time, record|
- unless time and record
- raise FormatError,
- "in_cat_sweep: pattern not match: #{message.inspect}"
- end
+ def emit_line(line)
+ if line
+ time, record = parse_line(line)
+ if time and record
router.emit(@tag, time, record)
end
end
end
+ def emit_file(fp)
+ entries = []
+ read_each_line(fp) do |line|
+ if line
+ entry = parse_line(line)
+ entries << entry if entry
+ end
+ end
+ unless entries.empty?
+ es = ArrayEventStream.new(entries)
+ router.emit_stream(@tag, es)
+ end
+ end
+
+ def parse_line(line)
+ entry = nil
+ @parser.parse(line) do |time, record|
+ if time && record
+ entry = [time, record]
+ else
+ # We want to fail an entire file on `pattern not match`
+ # This behavior makes it easy to recover with manual fix operation
+ raise FormatError,
+ "in_cat_sweep: pattern not match: #{line.inspect}"
+ end
+ end
+ entry
+ end
+
def process(original_filename, processing_filename)
File.open(processing_filename, 'r') do |tfile|
- read_each_line(tfile) do |line|
- emit_message(line)
+ if @file_event_stream
+ emit_file(tfile)
+ else
+ read_each_line(tfile) do |line|
+ emit_line(line)
+ end
end
log.debug { %[in_cat_sweep: process: {filename:"#{original_filename}",size:#{tfile.size}}] }
end
end