lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.0.rc1 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.0

- old
+ new

@@ -99,10 +99,14 @@ config_param :acl, :string, :default => nil desc "The length of `%{hex_random}` placeholder(4-16)" config_param :hex_random_length, :integer, :default => 4 desc "Overwrite already existing path" config_param :overwrite, :bool, :default => false + desc "Check bucket if exists or not" + config_param :check_bucket, :bool, :default => true + desc "Check object before creation" + config_param :check_object, :bool, :default => true desc "Specifies the AWS KMS key ID to use for object encryption" config_param :ssekms_key_id, :string, :default => nil, :secret => true desc "Specifies the algorithm to use to when encrypting the object" config_param :sse_customer_algorithm, :string, :default => nil desc "Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data" @@ -173,12 +177,16 @@ s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(:client => s3_client) @bucket = @s3.bucket(@s3_bucket) check_apikeys if @check_apikey_on_start - ensure_bucket + ensure_bucket if @check_bucket + if !@check_object + @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}" + end + super end def format(tag, time, record) @formatter.format(tag, time, record) @@ -186,38 +194,56 @@ def write(chunk) i = 0 previous_path = nil - begin - path = @path_slicer.call(@path) + if @check_object + begin + path = @path_slicer.call(@path) - @values_for_s3_object_chunk[chunk.unique_id] ||= { - "hex_random" => hex_random(chunk), - } + @values_for_s3_object_chunk[chunk.unique_id] ||= { + "hex_random" => hex_random(chunk), + } + values_for_s3_object_key = { + "path" => path, + "time_slice" => chunk.key, + "file_extension" => @compressor.ext, + "index" => i, + }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) + values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled + + s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| + values_for_s3_object_key[expr[2...expr.size-1]] + } + if (i > 0) && (s3path == previous_path) + if @overwrite + log.warn "#{s3path} already exists, but will overwrite" + break + else + raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" + end + end + + i += 1 + previous_path = s3path + end while @bucket.object(s3path).exists? + else + if @localtime + hms_slicer = Time.now.strftime("%H%M%S") + else + hms_slicer = Time.now.utc.strftime("%H%M%S") + end + values_for_s3_object_key = { - "path" => path, - "time_slice" => chunk.key, + "path" => @path_slicer.call(@path), + "date_slice" => chunk.key, "file_extension" => @compressor.ext, - "index" => i, - }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) - values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled - + "hms_slice" => hms_slicer, + } s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| values_for_s3_object_key[expr[2...expr.size-1]] } - if (i > 0) && (s3path == previous_path) - if @overwrite - log.warn "#{s3path} already exists, but will overwrite" - break - else - raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" - end - end - - i += 1 - previous_path = s3path - end while @bucket.object(s3path).exists? + end tmp = Tempfile.new("s3-") tmp.binmode begin @compressor.compress(chunk, tmp)