lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.0 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.6.1

- old
+ new

@@ -57,10 +57,11 @@ config_param :overwrite, :bool, :default => false attr_reader :bucket include Fluent::Mixin::ConfigPlaceholders + MAX_HEX_RANDOM_LENGTH = 16 def placeholders [:percent] end @@ -90,21 +91,24 @@ @path_slicer = Proc.new {|path| Time.now.utc.strftime(path) } end + if @hex_random_length > MAX_HEX_RANDOM_LENGTH + raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" + end + @storage_class = "REDUCED_REDUNDANCY" if @reduced_redundancy @values_for_s3_object_chunk = {} end def start super options = setup_credentials options[:region] = @s3_region if @s3_region options[:endpoint] = @s3_endpoint if @s3_endpoint options[:http_proxy] = @proxy_uri if @proxy_uri - options[:s3_server_side_encryption] = @use_server_side_encryption.to_sym if @use_server_side_encryption options[:force_path_style] = @force_path_style s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(:client => s3_client) @bucket = @s3.bucket(@s3_bucket) @@ -125,20 +129,20 @@ previous_path = nil begin path = @path_slicer.call(@path) - @values_for_s3_object_chunk[chunk.key] ||= { - "hex_random" => hex_random, - "uuid_flush" => uuid_random, + @values_for_s3_object_chunk[chunk.unique_id] ||= { + "hex_random" => hex_random(chunk), } values_for_s3_object_key = { "path" => path, "time_slice" => chunk.key, "file_extension" => @compressor.ext, "index" => i, - }.merge!(@values_for_s3_object_chunk[chunk.key]) + "uuid_flush" => uuid_random, + }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| values_for_s3_object_key[expr[2...expr.size-1]] } if (i > 0) && (s3path == previous_path) @@ -153,26 +157,46 @@ i += 1 previous_path = s3path end while @bucket.object(s3path).exists? tmp = Tempfile.new("s3-") + tmp.binmode begin @compressor.compress(chunk, tmp) tmp.rewind - log.debug { "out_s3: trying to write {object_id:#{chunk.object_id},time_slice:#{chunk.key}} to s3://#{@s3_bucket}/#{s3path}" } - @bucket.object(s3path).put(:body => tmp, - :content_type => @compressor.content_type, - :storage_class => @storage_class) + log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" } + + put_options = {:body => tmp, :content_type => @compressor.content_type, :storage_class => @storage_class} + put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption + @bucket.object(s3path).put(put_options) + + @values_for_s3_object_chunk.delete(chunk.unique_id) ensure - @values_for_s3_object_chunk.delete(chunk.key) tmp.close(true) rescue nil end end private - def hex_random - SecureRandom.hex(@hex_random_n)[0...@hex_random_length] + # tsuffix is the one which file buffer filename has + def tsuffix(chunk) + if chunk.is_a?(Fluent::FileBufferChunk) + unique_id = chunk.unique_id + tsuffix = unique_id[0...(unique_id.size/2)].unpack('C*').map {|x| x.to_s(16) }.join('') # size: 16 + else + nil + end + end + + def hex_random(chunk) + if chunk.is_a?(Fluent::FileBufferChunk) + # let me use tsuffix because its value is kept on retrying even after rebooting + tsuffix = tsuffix(chunk) + tsuffix.reverse! # tsuffix is like (time_sec, time_usec, rand) => reversing gives more randomness + tsuffix[0...@hex_random_length] + else + SecureRandom.hex(@hex_random_n)[0...@hex_random_length] + end end def ensure_bucket if !@bucket.exists? if @auto_create_bucket