lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0.pre1 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0

- old
+ new

@@ -1,7 +1,8 @@ module Fluent require 'fluent/mixin/config_placeholders' + require 'securerandom' class S3Output < Fluent::TimeSlicedOutput Fluent::Plugin.register_output('s3', self) def initialize @@ -41,18 +42,21 @@ config_param :aws_iam_retries, :integer, :default => 5 config_param :s3_bucket, :string config_param :s3_region, :string, :default => ENV["AWS_REGION"] || "us-east-1" config_param :s3_endpoint, :string, :default => nil config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" + config_param :force_path_style, :bool, :default => false config_param :store_as, :string, :default => "gzip" config_param :auto_create_bucket, :bool, :default => true config_param :check_apikey_on_start, :bool, :default => true config_param :proxy_uri, :string, :default => nil config_param :reduced_redundancy, :bool, :default => false config_param :storage_class, :string, :default => "STANDARD" config_param :format, :string, :default => 'out_file' config_param :acl, :string, :default => :private + config_param :hex_random_length, :integer, :default => 4 + config_param :overwrite, :bool, :default => false attr_reader :bucket include Fluent::Mixin::ConfigPlaceholders @@ -67,19 +71,18 @@ raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" end begin @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log) - rescue => e + rescue $log.warn "#{@store_as} not found. Use 'text' instead" @compressor = TextCompressor.new end @compressor.configure(conf) - # TODO: use Plugin.new_formatter instead of TextFormatter.create - conf['format'] = @format - @formatter = TextFormatter.create(conf) + @formatter = Plugin.new_formatter(@format) + @formatter.configure(conf) if @localtime @path_slicer = Proc.new {|path| Time.now.strftime(path) } @@ -88,60 +91,31 @@ Time.now.utc.strftime(path) } end @storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy + @values_for_s3_object_chunk = {} end def start super - options = {} - credentials_options = {} - case - when @aws_key_id && @aws_sec_key - options[:access_key_id] = @aws_key_id - options[:secret_access_key] = @aws_sec_key - when @assume_role_credentials - c = @assume_role_credentials - credentials_options[:role_arn] = c.role_arn - credentials_options[:role_session_name] = c.role_session_name - credentials_options[:policy] = c.policy if c.policy - credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds - credentials_options[:external_id] = c.external_id if c.external_id - options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) - when @instance_profile_credentials - c = @instance_profile_credentials - credentials_options[:retries] = c.retries if c.retries - credentials_options[:ip_address] = c.ip_address if c.ip_address - credentials_options[:port] = c.port if c.port - credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout - credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout - options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) - when @shared_credentials - c = @shared_credentials - credentials_options[:path] = c.path if c.path - credentials_options[:profile_name] = c.profile_name if c.profile_name - options[:credentials] = Aws::SharedCredentials.new(credentials_options) - when @aws_iam_retries - $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") - credentials_options[:retries] = @aws_iam_retries - options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) - else - # Use default credentials - # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html - end + options = setup_credentials options[:region] = @s3_region if @s3_region options[:endpoint] = @s3_endpoint if @s3_endpoint options[:http_proxy] = @proxy_uri if @proxy_uri options[:s3_server_side_encryption] = @use_server_side_encryption.to_sym if @use_server_side_encryption + options[:force_path_style] = @force_path_style s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(:client => s3_client) @bucket = @s3.bucket(@s3_bucket) check_apikeys if @check_apikey_on_start ensure_bucket + + # Securerandom.hex(2) returns 4 length hex + @hex_random_n = (@hex_random_length + 1) / 2 end def format(tag, time, record) @formatter.format(tag, time, record) end @@ -150,42 +124,58 @@ i = 0 previous_path = nil begin path = @path_slicer.call(@path) + + @values_for_s3_object_chunk[chunk.key] ||= { + "hex_random" => hex_random, + "uuid_flush" => uuid_random, + } values_for_s3_object_key = { "path" => path, "time_slice" => chunk.key, "file_extension" => @compressor.ext, "index" => i, - "uuid_flush" => uuid_random - } + }.merge!(@values_for_s3_object_chunk[chunk.key]) + s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| values_for_s3_object_key[expr[2...expr.size-1]] } if (i > 0) && (s3path == previous_path) - raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" + if @overwrite + log.warn "#{s3path} already exists, but will overwrite" + break + else + raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" + end end i += 1 previous_path = s3path end while @bucket.object(s3path).exists? tmp = Tempfile.new("s3-") begin @compressor.compress(chunk, tmp) tmp.rewind + log.debug { "out_s3: trying to write {object_id:#{chunk.object_id},time_slice:#{chunk.key}} to s3://#{@s3_bucket}/#{s3path}" } @bucket.object(s3path).put(:body => tmp, :content_type => @compressor.content_type, :storage_class => @storage_class) ensure + @values_for_s3_object_chunk.delete(chunk.key) tmp.close(true) rescue nil end end private + def hex_random + SecureRandom.hex(@hex_random_n)[0...@hex_random_length] + end + def ensure_bucket if !@bucket.exists? if @auto_create_bucket log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" @s3.create_bucket(:bucket => @s3_bucket) @@ -199,9 +189,48 @@ @bucket.objects.first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}" + end + + def setup_credentials + options = {} + credentials_options = {} + case + when @aws_key_id && @aws_sec_key + options[:access_key_id] = @aws_key_id + options[:secret_access_key] = @aws_sec_key + when @assume_role_credentials + c = @assume_role_credentials + credentials_options[:role_arn] = c.role_arn + credentials_options[:role_session_name] = c.role_session_name + credentials_options[:policy] = c.policy if c.policy + credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds + credentials_options[:external_id] = c.external_id if c.external_id + options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) + when @instance_profile_credentials + c = @instance_profile_credentials + credentials_options[:retries] = c.retries if c.retries + credentials_options[:ip_address] = c.ip_address if c.ip_address + credentials_options[:port] = c.port if c.port + credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout + credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout + options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) + when @shared_credentials + c = @shared_credentials + credentials_options[:path] = c.path if c.path + credentials_options[:profile_name] = c.profile_name if c.profile_name + options[:credentials] = Aws::SharedCredentials.new(credentials_options) + when @aws_iam_retries + $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") + credentials_options[:retries] = @aws_iam_retries + options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) + else + # Use default credentials + # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html + end + options end class Compressor include Configurable