lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.1
- old
+ new
@@ -57,10 +57,11 @@
config_param :overwrite, :bool, :default => false
attr_reader :bucket
include Fluent::Mixin::ConfigPlaceholders
+ MAX_HEX_RANDOM_LENGTH = 16
def placeholders
[:percent]
end
@@ -90,21 +91,24 @@
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end
+ if @hex_random_length > MAX_HEX_RANDOM_LENGTH
+ raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
+ end
+
@storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy
@values_for_s3_object_chunk = {}
end
def start
super
options = setup_credentials
options[:region] = @s3_region if @s3_region
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
- options[:s3_server_side_encryption] = @use_server_side_encryption.to_sym if @use_server_side_encryption
options[:force_path_style] = @force_path_style
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(:client => s3_client)
@bucket = @s3.bucket(@s3_bucket)
@@ -125,20 +129,20 @@
previous_path = nil
begin
path = @path_slicer.call(@path)
- @values_for_s3_object_chunk[chunk.key] ||= {
- "hex_random" => hex_random,
- "uuid_flush" => uuid_random,
+ @values_for_s3_object_chunk[chunk.unique_id] ||= {
+ "hex_random" => hex_random(chunk),
}
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
- }.merge!(@values_for_s3_object_chunk[chunk.key])
+ "uuid_flush" => uuid_random,
+ }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
@@ -153,26 +157,46 @@
i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
tmp = Tempfile.new("s3-")
+ tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
- log.debug { "out_s3: trying to write {object_id:#{chunk.object_id},time_slice:#{chunk.key}} to s3://#{@s3_bucket}/#{s3path}" }
- @bucket.object(s3path).put(:body => tmp,
- :content_type => @compressor.content_type,
- :storage_class => @storage_class)
+ log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
+
+ put_options = {:body => tmp, :content_type => @compressor.content_type, :storage_class => @storage_class}
+ put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
+ @bucket.object(s3path).put(put_options)
+
+ @values_for_s3_object_chunk.delete(chunk.unique_id)
ensure
- @values_for_s3_object_chunk.delete(chunk.key)
tmp.close(true) rescue nil
end
end
private
- def hex_random
- SecureRandom.hex(@hex_random_n)[0...@hex_random_length]
+ # tsuffix is the one which file buffer filename has
+ def tsuffix(chunk)
+ if chunk.is_a?(Fluent::FileBufferChunk)
+ unique_id = chunk.unique_id
+ tsuffix = unique_id[0...(unique_id.size/2)].unpack('C*').map {|x| x.to_s(16) }.join('') # size: 16
+ else
+ nil
+ end
+ end
+
+ def hex_random(chunk)
+ if chunk.is_a?(Fluent::FileBufferChunk)
+ # let me use tsuffix because its value is kept on retrying even after rebooting
+ tsuffix = tsuffix(chunk)
+ tsuffix.reverse! # tsuffix is like (time_sec, time_usec, rand) => reversing gives more randomness
+ tsuffix[0...@hex_random_length]
+ else
+ SecureRandom.hex(@hex_random_n)[0...@hex_random_length]
+ end
end
def ensure_bucket
if !@bucket.exists?
if @auto_create_bucket