lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc3 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc4

- old
+ new

@@ -1,7 +1,8 @@ require 'fluent/plugin/output' require 'fluent/log-ext' +require 'fluent/timezone' require 'aws-sdk-resources' require 'zlib' require 'time' require 'tempfile' @@ -155,10 +156,11 @@ @s3_object_key_format = process_s3_object_key_format # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] @values_for_s3_object_chunk = {} + @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end def multi_workers_ready? true end @@ -198,15 +200,14 @@ def write(chunk) i = 0 metadata = chunk.metadata previous_path = nil - time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']) time_slice = if metadata.timekey.nil? ''.freeze else - Time.at(metadata.timekey).utc.strftime(time_slice_format) + @time_slice_with_tz.call(metadata.timekey) end if @check_object begin @values_for_s3_object_chunk[chunk.unique_id] ||= { @@ -246,10 +247,11 @@ } values_for_s3_object_key = { "%{path}" => @path, "%{time_slice}" => time_slice, "%{file_extension}" => @compressor.ext, + "%{hms_slice}" => hms_slicer, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key) s3path = extract_placeholders(s3path, metadata) @@ -352,10 +354,10 @@ def check_apikeys @bucket.objects(prefix: @path).first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e - raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}" + raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}" end def setup_credentials options = {} credentials_options = {}