lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.5.11 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0.pre1

- old
+ new

@@ -4,33 +4,53 @@ class S3Output < Fluent::TimeSlicedOutput Fluent::Plugin.register_output('s3', self) def initialize super - require 'aws-sdk-v1' + require 'aws-sdk-resources' require 'zlib' require 'time' require 'tempfile' @compressor = nil end config_param :path, :string, :default => "" - config_param :use_ssl, :bool, :default => true config_param :use_server_side_encryption, :string, :default => nil config_param :aws_key_id, :string, :default => nil, :secret => true config_param :aws_sec_key, :string, :default => nil, :secret => true + config_section :assume_role_credentials, :multi => false do + config_param :role_arn, :string + config_param :role_session_name, :string + config_param :policy, :string, :default => nil + config_param :duration_seconds, :integer, :default => nil + config_param :external_id, :string, :default => nil + end + config_section :instance_profile_credentials, :multi => false do + config_param :retries, :integer, :default => nil + config_param :ip_address, :string, :default => nil + config_param :port, :integer, :default => nil + config_param :http_open_timeout, :float, :default => nil + config_param :http_read_timeout, :float, :default => nil + # config_param :delay, :integer or :proc, :default => nil + # config_param :http_degub_output, :io, :default => nil + end + config_section :shared_credentials, :multi => false do + config_param :path, :string, :default => nil + config_param :profile_name, :string, :default => nil + end config_param :aws_iam_retries, :integer, :default => 5 config_param :s3_bucket, :string - config_param :s3_region, :string, :default => nil + config_param :s3_region, :string, :default => ENV["AWS_REGION"] || "us-east-1" config_param :s3_endpoint, :string, :default => nil config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" config_param :store_as, :string, :default => "gzip" config_param :auto_create_bucket, :bool, :default => true config_param :check_apikey_on_start, :bool, :default => true config_param :proxy_uri, :string, :default => nil config_param :reduced_redundancy, :bool, :default => false + config_param :storage_class, :string, :default => "STANDARD" config_param :format, :string, :default => 'out_file' config_param :acl, :string, :default => :private attr_reader :bucket @@ -66,31 +86,59 @@ else @path_slicer = Proc.new {|path| Time.now.utc.strftime(path) } end + + @storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy end def start super options = {} - if @aws_key_id && @aws_sec_key + credentials_options = {} + case + when @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key - elsif ENV.key? "AWS_ACCESS_KEY_ID" - options[:credential_provider] = AWS::Core::CredentialProviders::ENVProvider.new('AWS') + when @assume_role_credentials + c = @assume_role_credentials + credentials_options[:role_arn] = c.role_arn + credentials_options[:role_session_name] = c.role_session_name + credentials_options[:policy] = c.policy if c.policy + credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds + credentials_options[:external_id] = c.external_id if c.external_id + options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) + when @instance_profile_credentials + c = @instance_profile_credentials + credentials_options[:retries] = c.retries if c.retries + credentials_options[:ip_address] = c.ip_address if c.ip_address + credentials_options[:port] = c.port if c.port + credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout + credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout + options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) + when @shared_credentials + c = @shared_credentials + credentials_options[:path] = c.path if c.path + credentials_options[:profile_name] = c.profile_name if c.profile_name + options[:credentials] = Aws::SharedCredentials.new(credentials_options) + when @aws_iam_retries + $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") + credentials_options[:retries] = @aws_iam_retries + options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) else - options[:credential_provider] = AWS::Core::CredentialProviders::EC2Provider.new({:retries => @aws_iam_retries}) + # Use default credentials + # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html end options[:region] = @s3_region if @s3_region - options[:s3_endpoint] = @s3_endpoint if @s3_endpoint - options[:proxy_uri] = @proxy_uri if @proxy_uri - options[:use_ssl] = @use_ssl + options[:endpoint] = @s3_endpoint if @s3_endpoint + options[:http_proxy] = @proxy_uri if @proxy_uri options[:s3_server_side_encryption] = @use_server_side_encryption.to_sym if @use_server_side_encryption - @s3 = AWS::S3.new(options) - @bucket = @s3.buckets[@s3_bucket] + s3_client = Aws::S3::Client.new(options) + @s3 = Aws::S3::Resource.new(:client => s3_client) + @bucket = @s3.bucket(@s3_bucket) check_apikeys if @check_apikey_on_start ensure_bucket end @@ -118,18 +166,19 @@ raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" end i += 1 previous_path = s3path - end while @bucket.objects[s3path].exists? + end while @bucket.object(s3path).exists? tmp = Tempfile.new("s3-") begin @compressor.compress(chunk, tmp) - @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @compressor.content_type, - :reduced_redundancy => @reduced_redundancy, - :acl => @acl}) + tmp.rewind + @bucket.object(s3path).put(:body => tmp, + :content_type => @compressor.content_type, + :storage_class => @storage_class) ensure tmp.close(true) rescue nil end end @@ -137,20 +186,20 @@ def ensure_bucket if !@bucket.exists? if @auto_create_bucket log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" - @s3.buckets.create(@s3_bucket) + @s3.create_bucket(:bucket => @s3_bucket) else raise "The specified bucket does not exist: bucket = #{@s3_bucket}" end end end def check_apikeys - @bucket.empty? - rescue AWS::S3::Errors::NoSuchBucket + @bucket.objects.first + rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}" end @@ -202,13 +251,13 @@ end def compress(chunk, tmp) w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) - w.close + w.finish ensure - w.close rescue nil + w.finish rescue nil end end class TextCompressor < Compressor def ext @@ -219,10 +268,9 @@ 'text/plain'.freeze end def compress(chunk, tmp) chunk.write_to(tmp) - tmp.close end end class JsonCompressor < TextCompressor def ext