lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0.pre1 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0
- old
+ new
@@ -1,7 +1,8 @@
module Fluent
require 'fluent/mixin/config_placeholders'
+ require 'securerandom'
class S3Output < Fluent::TimeSlicedOutput
Fluent::Plugin.register_output('s3', self)
def initialize
@@ -41,18 +42,21 @@
config_param :aws_iam_retries, :integer, :default => 5
config_param :s3_bucket, :string
config_param :s3_region, :string, :default => ENV["AWS_REGION"] || "us-east-1"
config_param :s3_endpoint, :string, :default => nil
config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}"
+ config_param :force_path_style, :bool, :default => false
config_param :store_as, :string, :default => "gzip"
config_param :auto_create_bucket, :bool, :default => true
config_param :check_apikey_on_start, :bool, :default => true
config_param :proxy_uri, :string, :default => nil
config_param :reduced_redundancy, :bool, :default => false
config_param :storage_class, :string, :default => "STANDARD"
config_param :format, :string, :default => 'out_file'
config_param :acl, :string, :default => :private
+ config_param :hex_random_length, :integer, :default => 4
+ config_param :overwrite, :bool, :default => false
attr_reader :bucket
include Fluent::Mixin::ConfigPlaceholders
@@ -67,19 +71,18 @@
raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
@compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
- rescue => e
+ rescue
$log.warn "#{@store_as} not found. Use 'text' instead"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
- # TODO: use Plugin.new_formatter instead of TextFormatter.create
- conf['format'] = @format
- @formatter = TextFormatter.create(conf)
+ @formatter = Plugin.new_formatter(@format)
+ @formatter.configure(conf)
if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
@@ -88,60 +91,31 @@
Time.now.utc.strftime(path)
}
end
@storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy
+ @values_for_s3_object_chunk = {}
end
def start
super
- options = {}
- credentials_options = {}
- case
- when @aws_key_id && @aws_sec_key
- options[:access_key_id] = @aws_key_id
- options[:secret_access_key] = @aws_sec_key
- when @assume_role_credentials
- c = @assume_role_credentials
- credentials_options[:role_arn] = c.role_arn
- credentials_options[:role_session_name] = c.role_session_name
- credentials_options[:policy] = c.policy if c.policy
- credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
- credentials_options[:external_id] = c.external_id if c.external_id
- options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
- when @instance_profile_credentials
- c = @instance_profile_credentials
- credentials_options[:retries] = c.retries if c.retries
- credentials_options[:ip_address] = c.ip_address if c.ip_address
- credentials_options[:port] = c.port if c.port
- credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
- credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
- options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
- when @shared_credentials
- c = @shared_credentials
- credentials_options[:path] = c.path if c.path
- credentials_options[:profile_name] = c.profile_name if c.profile_name
- options[:credentials] = Aws::SharedCredentials.new(credentials_options)
- when @aws_iam_retries
- $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
- credentials_options[:retries] = @aws_iam_retries
- options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
- else
- # Use default credentials
- # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
- end
+ options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:s3_server_side_encryption] = @use_server_side_encryption.to_sym if @use_server_side_encryption
+ options[:force_path_style] = @force_path_style
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket
+
+ # Securerandom.hex(2) returns 4 length hex
+ @hex_random_n = (@hex_random_length + 1) / 2
end
def format(tag, time, record)
@formatter.format(tag, time, record)
end
@@ -150,42 +124,58 @@
i = 0
previous_path = nil
begin
path = @path_slicer.call(@path)
+
+ @values_for_s3_object_chunk[chunk.key] ||= {
+ "hex_random" => hex_random,
+ "uuid_flush" => uuid_random,
+ }
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
- "uuid_flush" => uuid_random
- }
+ }.merge!(@values_for_s3_object_chunk[chunk.key])
+
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
- raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
+ if @overwrite
+ log.warn "#{s3path} already exists, but will overwrite"
+ break
+ else
+ raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
+ end
end
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
begin
@compressor.compress(chunk, tmp)
tmp.rewind
+ log.debug { "out_s3: trying to write {object_id:#{chunk.object_id},time_slice:#{chunk.key}} to s3://#{@s3_bucket}/#{s3path}" }
@bucket.object(s3path).put(:body => tmp,
:content_type => @compressor.content_type,
:storage_class => @storage_class)
ensure
+ @values_for_s3_object_chunk.delete(chunk.key)
tmp.close(true) rescue nil
end
end
private
+ def hex_random
+ SecureRandom.hex(@hex_random_n)[0...@hex_random_length]
+ end
+
def ensure_bucket
if !@bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
@s3.create_bucket(:bucket => @s3_bucket)
@@ -199,9 +189,48 @@
@bucket.objects.first
rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}"
+ end
+
+ def setup_credentials
+ options = {}
+ credentials_options = {}
+ case
+ when @aws_key_id && @aws_sec_key
+ options[:access_key_id] = @aws_key_id
+ options[:secret_access_key] = @aws_sec_key
+ when @assume_role_credentials
+ c = @assume_role_credentials
+ credentials_options[:role_arn] = c.role_arn
+ credentials_options[:role_session_name] = c.role_session_name
+ credentials_options[:policy] = c.policy if c.policy
+ credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
+ credentials_options[:external_id] = c.external_id if c.external_id
+ options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
+ when @instance_profile_credentials
+ c = @instance_profile_credentials
+ credentials_options[:retries] = c.retries if c.retries
+ credentials_options[:ip_address] = c.ip_address if c.ip_address
+ credentials_options[:port] = c.port if c.port
+ credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
+ credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
+ options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
+ when @shared_credentials
+ c = @shared_credentials
+ credentials_options[:path] = c.path if c.path
+ credentials_options[:profile_name] = c.profile_name if c.profile_name
+ options[:credentials] = Aws::SharedCredentials.new(credentials_options)
+ when @aws_iam_retries
+ $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
+ credentials_options[:retries] = @aws_iam_retries
+ options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
+ else
+ # Use default credentials
+ # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
+ end
+ options
end
class Compressor
include Configurable