lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc2 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc3
- old
+ new
@@ -1,13 +1,14 @@
-require 'fluent/output'
+require 'fluent/plugin/output'
+require 'fluent/log-ext'
require 'aws-sdk-resources'
require 'zlib'
require 'time'
require 'tempfile'
module Fluent::Plugin
- class S3Output < Fluent::Plugin::Output
+ class S3Output < Output
Fluent::Plugin.register_output('s3', self)
helpers :compat_parameters, :formatter, :inject
def initialize
@@ -111,10 +112,15 @@
config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end
+ config_section :buffer do
+ config_set_default :chunk_keys, ['time']
+ config_set_default :timekey, (60 * 60 * 24)
+ end
+
attr_reader :bucket
MAX_HEX_RANDOM_LENGTH = 16
def configure(conf)
@@ -164,10 +170,14 @@
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
options[:signature_version] = @signature_version unless @signature_version.nil?
options[:ssl_verify_peer] = @ssl_verify_peer
+ log.on_trace do
+ options[:http_wire_trace] = true
+ options[:logger] = log
+ end
s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)
@@ -186,30 +196,34 @@
@formatter.format(tag, time, r)
end
def write(chunk)
i = 0
+ metadata = chunk.metadata
previous_path = nil
time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])
- time_slice = Time.at(chunk.metadata.timekey).utc.strftime(time_slice_format)
+ time_slice = if metadata.timekey.nil?
+ ''.freeze
+ else
+ Time.at(metadata.timekey).utc.strftime(time_slice_format)
+ end
if @check_object
begin
- path = extract_placeholders(@path, chunk.metadata)
-
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"%{hex_random}" => hex_random(chunk),
}
values_for_s3_object_key = {
- "%{path}" => path,
+ "%{path}" => @path,
"%{time_slice}" => time_slice,
"%{file_extension}" => @compressor.ext,
"%{index}" => i,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key)
+ s3path = extract_placeholders(s3path, metadata)
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
@@ -225,22 +239,21 @@
hms_slicer = Time.now.strftime("%H%M%S")
else
hms_slicer = Time.now.utc.strftime("%H%M%S")
end
- path = extract_placeholders(@path, chunk.metadata)
-
@values_for_s3_object_chunk[chunk.unique_id] ||= {
"%{hex_random}" => hex_random(chunk),
}
values_for_s3_object_key = {
- "%{path}" => path,
+ "%{path}" => @path,
"%{time_slice}" => time_slice,
"%{file_extension}" => @compressor.ext,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key)
+ s3path = extract_placeholders(s3path, metadata)
end
tmp = Tempfile.new("s3-")
tmp.binmode
begin