lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.6 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.7

- old
+ new

@@ -103,10 +103,14 @@ config_param :overwrite, :bool, :default => false desc "Specifies the AWS KMS key ID to use for object encryption" config_param :ssekms_key_id, :string, :default => nil, :secret => true desc "AWS SDK uses MD5 for API request/response by default" config_param :compute_checksums, :bool, :default => nil # use nil to follow SDK default configuration + desc "Signature version for API Request (s3,v4)" + config_param :signature_version, :string, :default => nil # use nil to follow SDK default configuration + desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3" + config_param :warn_for_delay, :time, :default => nil attr_reader :bucket include Fluent::Mixin::ConfigPlaceholders MAX_HEX_RANDOM_LENGTH = 16 @@ -157,10 +161,11 @@ options[:region] = @s3_region if @s3_region options[:endpoint] = @s3_endpoint if @s3_endpoint options[:http_proxy] = @proxy_uri if @proxy_uri options[:force_path_style] = @force_path_style options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil? + options[:signature_version] = @signature_version unless @signature_version.nil? s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(:client => s3_client) @bucket = @s3.bucket(@s3_bucket) @@ -225,9 +230,15 @@ put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id put_options[:acl] = @acl if @acl @bucket.object(s3path).put(put_options) @values_for_s3_object_chunk.delete(chunk.unique_id) + + if @warn_for_delay + if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay + log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" } + end + end ensure tmp.close(true) rescue nil end end