lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.6 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.7
- old
+ new
@@ -103,10 +103,14 @@
config_param :overwrite, :bool, :default => false
desc "Specifies the AWS KMS key ID to use for object encryption"
config_param :ssekms_key_id, :string, :default => nil, :secret => true
desc "AWS SDK uses MD5 for API request/response by default"
config_param :compute_checksums, :bool, :default => nil # use nil to follow SDK default configuration
+ desc "Signature version for API Request (s3,v4)"
+ config_param :signature_version, :string, :default => nil # use nil to follow SDK default configuration
+ desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3"
+ config_param :warn_for_delay, :time, :default => nil
attr_reader :bucket
include Fluent::Mixin::ConfigPlaceholders
MAX_HEX_RANDOM_LENGTH = 16
@@ -157,10 +161,11 @@
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
+ options[:signature_version] = @signature_version unless @signature_version.nil?
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
@@ -225,9 +230,15 @@
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
put_options[:acl] = @acl if @acl
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
+
+ if @warn_for_delay
+ if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay
+ log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
+ end
+ end
ensure
tmp.close(true) rescue nil
end
end