lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.0.rc1 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.0
- old
+ new
@@ -99,10 +99,14 @@
config_param :acl, :string, :default => nil
desc "The length of `%{hex_random}` placeholder(4-16)"
config_param :hex_random_length, :integer, :default => 4
desc "Overwrite already existing path"
config_param :overwrite, :bool, :default => false
+ desc "Check bucket if exists or not"
+ config_param :check_bucket, :bool, :default => true
+ desc "Check object before creation"
+ config_param :check_object, :bool, :default => true
desc "Specifies the AWS KMS key ID to use for object encryption"
config_param :ssekms_key_id, :string, :default => nil, :secret => true
desc "Specifies the algorithm to use to when encrypting the object"
config_param :sse_customer_algorithm, :string, :default => nil
desc "Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data"
@@ -173,12 +177,16 @@
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
- ensure_bucket
+ ensure_bucket if @check_bucket
+ if !@check_object
+ @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
+ end
+
super
end
def format(tag, time, record)
@formatter.format(tag, time, record)
@@ -186,38 +194,56 @@
def write(chunk)
i = 0
previous_path = nil
- begin
- path = @path_slicer.call(@path)
+ if @check_object
+ begin
+ path = @path_slicer.call(@path)
- @values_for_s3_object_chunk[chunk.unique_id] ||= {
- "hex_random" => hex_random(chunk),
- }
+ @values_for_s3_object_chunk[chunk.unique_id] ||= {
+ "hex_random" => hex_random(chunk),
+ }
+ values_for_s3_object_key = {
+ "path" => path,
+ "time_slice" => chunk.key,
+ "file_extension" => @compressor.ext,
+ "index" => i,
+ }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
+ values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled
+
+ s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
+ values_for_s3_object_key[expr[2...expr.size-1]]
+ }
+ if (i > 0) && (s3path == previous_path)
+ if @overwrite
+ log.warn "#{s3path} already exists, but will overwrite"
+ break
+ else
+ raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
+ end
+ end
+
+ i += 1
+ previous_path = s3path
+ end while @bucket.object(s3path).exists?
+ else
+ if @localtime
+ hms_slicer = Time.now.strftime("%H%M%S")
+ else
+ hms_slicer = Time.now.utc.strftime("%H%M%S")
+ end
+
values_for_s3_object_key = {
- "path" => path,
- "time_slice" => chunk.key,
+ "path" => @path_slicer.call(@path),
+ "date_slice" => chunk.key,
"file_extension" => @compressor.ext,
- "index" => i,
- }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
- values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled
-
+ "hms_slice" => hms_slicer,
+ }
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
- if (i > 0) && (s3path == previous_path)
- if @overwrite
- log.warn "#{s3path} already exists, but will overwrite"
- break
- else
- raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
- end
- end
-
- i += 1
- previous_path = s3path
- end while @bucket.object(s3path).exists?
+ end
tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)