lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.5.11 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0.pre1
- old
+ new
@@ -4,33 +4,53 @@
class S3Output < Fluent::TimeSlicedOutput
Fluent::Plugin.register_output('s3', self)
def initialize
super
- require 'aws-sdk-v1'
+ require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@compressor = nil
end
config_param :path, :string, :default => ""
- config_param :use_ssl, :bool, :default => true
config_param :use_server_side_encryption, :string, :default => nil
config_param :aws_key_id, :string, :default => nil, :secret => true
config_param :aws_sec_key, :string, :default => nil, :secret => true
+ config_section :assume_role_credentials, :multi => false do
+ config_param :role_arn, :string
+ config_param :role_session_name, :string
+ config_param :policy, :string, :default => nil
+ config_param :duration_seconds, :integer, :default => nil
+ config_param :external_id, :string, :default => nil
+ end
+ config_section :instance_profile_credentials, :multi => false do
+ config_param :retries, :integer, :default => nil
+ config_param :ip_address, :string, :default => nil
+ config_param :port, :integer, :default => nil
+ config_param :http_open_timeout, :float, :default => nil
+ config_param :http_read_timeout, :float, :default => nil
+ # config_param :delay, :integer or :proc, :default => nil
+ # config_param :http_degub_output, :io, :default => nil
+ end
+ config_section :shared_credentials, :multi => false do
+ config_param :path, :string, :default => nil
+ config_param :profile_name, :string, :default => nil
+ end
config_param :aws_iam_retries, :integer, :default => 5
config_param :s3_bucket, :string
- config_param :s3_region, :string, :default => nil
+ config_param :s3_region, :string, :default => ENV["AWS_REGION"] || "us-east-1"
config_param :s3_endpoint, :string, :default => nil
config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}"
config_param :store_as, :string, :default => "gzip"
config_param :auto_create_bucket, :bool, :default => true
config_param :check_apikey_on_start, :bool, :default => true
config_param :proxy_uri, :string, :default => nil
config_param :reduced_redundancy, :bool, :default => false
+ config_param :storage_class, :string, :default => "STANDARD"
config_param :format, :string, :default => 'out_file'
config_param :acl, :string, :default => :private
attr_reader :bucket
@@ -66,31 +86,59 @@
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
+
+ @storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy
end
def start
super
options = {}
- if @aws_key_id && @aws_sec_key
+ credentials_options = {}
+ case
+ when @aws_key_id && @aws_sec_key
options[:access_key_id] = @aws_key_id
options[:secret_access_key] = @aws_sec_key
- elsif ENV.key? "AWS_ACCESS_KEY_ID"
- options[:credential_provider] = AWS::Core::CredentialProviders::ENVProvider.new('AWS')
+ when @assume_role_credentials
+ c = @assume_role_credentials
+ credentials_options[:role_arn] = c.role_arn
+ credentials_options[:role_session_name] = c.role_session_name
+ credentials_options[:policy] = c.policy if c.policy
+ credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
+ credentials_options[:external_id] = c.external_id if c.external_id
+ options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
+ when @instance_profile_credentials
+ c = @instance_profile_credentials
+ credentials_options[:retries] = c.retries if c.retries
+ credentials_options[:ip_address] = c.ip_address if c.ip_address
+ credentials_options[:port] = c.port if c.port
+ credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
+ credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
+ options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
+ when @shared_credentials
+ c = @shared_credentials
+ credentials_options[:path] = c.path if c.path
+ credentials_options[:profile_name] = c.profile_name if c.profile_name
+ options[:credentials] = Aws::SharedCredentials.new(credentials_options)
+ when @aws_iam_retries
+ $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
+ credentials_options[:retries] = @aws_iam_retries
+ options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
else
- options[:credential_provider] = AWS::Core::CredentialProviders::EC2Provider.new({:retries => @aws_iam_retries})
+ # Use default credentials
+ # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
end
options[:region] = @s3_region if @s3_region
- options[:s3_endpoint] = @s3_endpoint if @s3_endpoint
- options[:proxy_uri] = @proxy_uri if @proxy_uri
- options[:use_ssl] = @use_ssl
+ options[:endpoint] = @s3_endpoint if @s3_endpoint
+ options[:http_proxy] = @proxy_uri if @proxy_uri
options[:s3_server_side_encryption] = @use_server_side_encryption.to_sym if @use_server_side_encryption
- @s3 = AWS::S3.new(options)
- @bucket = @s3.buckets[@s3_bucket]
+ s3_client = Aws::S3::Client.new(options)
+ @s3 = Aws::S3::Resource.new(:client => s3_client)
+ @bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket
end
@@ -118,18 +166,19 @@
raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
end
i += 1
previous_path = s3path
- end while @bucket.objects[s3path].exists?
+ end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
begin
@compressor.compress(chunk, tmp)
- @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @compressor.content_type,
- :reduced_redundancy => @reduced_redundancy,
- :acl => @acl})
+ tmp.rewind
+ @bucket.object(s3path).put(:body => tmp,
+ :content_type => @compressor.content_type,
+ :storage_class => @storage_class)
ensure
tmp.close(true) rescue nil
end
end
@@ -137,20 +186,20 @@
def ensure_bucket
if !@bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
- @s3.buckets.create(@s3_bucket)
+ @s3.create_bucket(:bucket => @s3_bucket)
else
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
end
end
end
def check_apikeys
- @bucket.empty?
- rescue AWS::S3::Errors::NoSuchBucket
+ @bucket.objects.first
+ rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}"
end
@@ -202,13 +251,13 @@
end
def compress(chunk, tmp)
w = Zlib::GzipWriter.new(tmp)
chunk.write_to(w)
- w.close
+ w.finish
ensure
- w.close rescue nil
+ w.finish rescue nil
end
end
class TextCompressor < Compressor
def ext
@@ -219,10 +268,9 @@
'text/plain'.freeze
end
def compress(chunk, tmp)
chunk.write_to(tmp)
- tmp.close
end
end
class JsonCompressor < TextCompressor
def ext