lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.0.1 vs lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.0
- old
+ new
@@ -48,12 +48,24 @@
if @line_terminated_by.empty?
raise Fluent::ConfigError, "in_cat_sweep: `line_terminated_by` must has some letters."
end
- if !remove_file? and !Dir.exists?(@move_to)
- raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory must be existed."
+ if !remove_file?
+ first_filename = Dir.glob(@file_path_with_glob).first
+ dirname = first_filename ? move_dirname(first_filename) : @move_to
+ if Dir.exist?(dirname)
+ if !File.writable?(dirname)
+ raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable."
+ end
+ else
+ begin
+ FileUtils.mkdir_p(dirname)
+ rescue => e
+ raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable."
+ end
+ end
end
@read_bytes_once = 262144 # 256 KB
end
@@ -80,14 +92,13 @@
lock_with_renaming(filename, processing_filename) do
process(processing_filename)
after_processing(processing_filename)
end
rescue => e
- log.error "in_cat_sweep: processing error: #{e}, file: #{processing_filename}",
- :error => e, :error_class => e.class
+ log.error "in_cat_sweep: processing: #{processing_filename}", :error => e, :error_class => e.class
log.error_backtrace
- safe_fail(processing_filename)
+ safe_fail(e, processing_filename)
end
end
end
end
@@ -110,23 +121,30 @@
end
def get_processing_filename(filename)
tmpfile = String.new
tmpfile << filename << '.' << Process.pid.to_s << '.'
- tmpfile << Time.now.to_i.to_s << '.' << @processing_file_suffix
+ tmpfile << Time.now.to_i.to_s << @processing_file_suffix
end
- def get_error_filename(filename)
+ def revert_processing_filename(processing_filename)
+ tmpfile = processing_filename.dup
+ tmpfile.chomp!(@processing_file_suffix)
+ tmpfile.gsub!(/\.\d+\.\d+$/, '')
+ end
+
+ def get_error_filename(e, filename)
errfile = String.new
- errfile << filename << '.' << @error_file_suffix
+ errfile << filename << "." << e.class.to_s << @error_file_suffix
end
- def safe_fail(filename)
+ def safe_fail(e, filename)
begin
- lock_with_renaming(filename, get_error_filename(filename))
+ error_filename = get_error_filename(e, filename)
+ lock_with_renaming(filename, error_filename)
rescue => e
- log.error "in_cat_sweep: rename #{filename} to error name. message: #{e}",
+ log.error "in_cat_sweep: rename #{filename} to error filename #{error_filename}",
:error => e, :error_class => e.class
log.error_backtrace
end
end
@@ -199,14 +217,21 @@
file.flock(File::LOCK_UN) # release the lock
file.close
end
end
- def after_processing(filename)
+ def move_dirname(filename)
+ File.join(@move_to, File.dirname(File.expand_path(filename)))
+ end
+
+ def after_processing(processing_filename)
if remove_file?
- FileUtils.rm(filename)
+ FileUtils.rm(processing_filename)
else
- FileUtils.mv(filename, @move_to)
+ dirname = move_dirname(processing_filename)
+ FileUtils.mkdir_p(dirname)
+ filename = revert_processing_filename(File.basename(processing_filename))
+ FileUtils.mv(processing_filename, File.join(dirname, filename))
end
end
end
end