lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.0.1 vs lib/fluent/plugin/in_cat_sweep.rb in fluent-plugin-cat-sweep-0.1.0

- old
+ new

@@ -48,12 +48,24 @@ if @line_terminated_by.empty? raise Fluent::ConfigError, "in_cat_sweep: `line_terminated_by` must has some letters." end - if !remove_file? and !Dir.exists?(@move_to) - raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory must be existed." + if !remove_file? + first_filename = Dir.glob(@file_path_with_glob).first + dirname = first_filename ? move_dirname(first_filename) : @move_to + if Dir.exist?(dirname) + if !File.writable?(dirname) + raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable." + end + else + begin + FileUtils.mkdir_p(dirname) + rescue => e + raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable." + end + end end @read_bytes_once = 262144 # 256 KB end @@ -80,14 +92,13 @@ lock_with_renaming(filename, processing_filename) do process(processing_filename) after_processing(processing_filename) end rescue => e - log.error "in_cat_sweep: processing error: #{e}, file: #{processing_filename}", - :error => e, :error_class => e.class + log.error "in_cat_sweep: processing: #{processing_filename}", :error => e, :error_class => e.class log.error_backtrace - safe_fail(processing_filename) + safe_fail(e, processing_filename) end end end end @@ -110,23 +121,30 @@ end def get_processing_filename(filename) tmpfile = String.new tmpfile << filename << '.' << Process.pid.to_s << '.' - tmpfile << Time.now.to_i.to_s << '.' << @processing_file_suffix + tmpfile << Time.now.to_i.to_s << @processing_file_suffix end - def get_error_filename(filename) + def revert_processing_filename(processing_filename) + tmpfile = processing_filename.dup + tmpfile.chomp!(@processing_file_suffix) + tmpfile.gsub!(/\.\d+\.\d+$/, '') + end + + def get_error_filename(e, filename) errfile = String.new - errfile << filename << '.' << @error_file_suffix + errfile << filename << "." << e.class.to_s << @error_file_suffix end - def safe_fail(filename) + def safe_fail(e, filename) begin - lock_with_renaming(filename, get_error_filename(filename)) + error_filename = get_error_filename(e, filename) + lock_with_renaming(filename, error_filename) rescue => e - log.error "in_cat_sweep: rename #{filename} to error name. message: #{e}", + log.error "in_cat_sweep: rename #{filename} to error filename #{error_filename}", :error => e, :error_class => e.class log.error_backtrace end end @@ -199,14 +217,21 @@ file.flock(File::LOCK_UN) # release the lock file.close end end - def after_processing(filename) + def move_dirname(filename) + File.join(@move_to, File.dirname(File.expand_path(filename))) + end + + def after_processing(processing_filename) if remove_file? - FileUtils.rm(filename) + FileUtils.rm(processing_filename) else - FileUtils.mv(filename, @move_to) + dirname = move_dirname(processing_filename) + FileUtils.mkdir_p(dirname) + filename = revert_processing_filename(File.basename(processing_filename)) + FileUtils.mv(processing_filename, File.join(dirname, filename)) end end end end