lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc3 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc4
- old
+ new
@@ -1,7 +1,8 @@
require 'fluent/plugin/output'
require 'fluent/log-ext'
+require 'fluent/timezone'
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
@@ -155,10 +156,11 @@
@s3_object_key_format = process_s3_object_key_format
# For backward compatibility
# TODO: Remove time_slice_format when end of support compat_parameters
@configured_time_slice_format = conf['time_slice_format']
@values_for_s3_object_chunk = {}
+ @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end
def multi_workers_ready?
true
end
@@ -198,15 +200,14 @@
def write(chunk)
i = 0
metadata = chunk.metadata
previous_path = nil
- time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])
time_slice = if metadata.timekey.nil?
''.freeze
else
- Time.at(metadata.timekey).utc.strftime(time_slice_format)
+ @time_slice_with_tz.call(metadata.timekey)
end
if @check_object
begin
@values_for_s3_object_chunk[chunk.unique_id] ||= {
@@ -246,10 +247,11 @@
}
values_for_s3_object_key = {
"%{path}" => @path,
"%{time_slice}" => time_slice,
"%{file_extension}" => @compressor.ext,
+ "%{hms_slice}" => hms_slicer,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key)
s3path = extract_placeholders(s3path, metadata)
@@ -352,10 +354,10 @@
def check_apikeys
@bucket.objects(prefix: @path).first
rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
- raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}"
+ raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
end
def setup_credentials
options = {}
credentials_options = {}