lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc2 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc3

- old
+ new

@@ -1,13 +1,14 @@ -require 'fluent/output' +require 'fluent/plugin/output' +require 'fluent/log-ext' require 'aws-sdk-resources' require 'zlib' require 'time' require 'tempfile' module Fluent::Plugin - class S3Output < Fluent::Plugin::Output + class S3Output < Output Fluent::Plugin.register_output('s3', self) helpers :compat_parameters, :formatter, :inject def initialize @@ -111,10 +112,15 @@ config_section :format do config_set_default :@type, DEFAULT_FORMAT_TYPE end + config_section :buffer do + config_set_default :chunk_keys, ['time'] + config_set_default :timekey, (60 * 60 * 24) + end + attr_reader :bucket MAX_HEX_RANDOM_LENGTH = 16 def configure(conf) @@ -164,10 +170,14 @@ options[:http_proxy] = @proxy_uri if @proxy_uri options[:force_path_style] = @force_path_style options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil? options[:signature_version] = @signature_version unless @signature_version.nil? options[:ssl_verify_peer] = @ssl_verify_peer + log.on_trace do + options[:http_wire_trace] = true + options[:logger] = log + end s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) @@ -186,30 +196,34 @@ @formatter.format(tag, time, r) end def write(chunk) i = 0 + metadata = chunk.metadata previous_path = nil time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']) - time_slice = Time.at(chunk.metadata.timekey).utc.strftime(time_slice_format) + time_slice = if metadata.timekey.nil? + ''.freeze + else + Time.at(metadata.timekey).utc.strftime(time_slice_format) + end if @check_object begin - path = extract_placeholders(@path, chunk.metadata) - @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key = { - "%{path}" => path, + "%{path}" => @path, "%{time_slice}" => time_slice, "%{file_extension}" => @compressor.ext, "%{index}" => i, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key) + s3path = extract_placeholders(s3path, metadata) if (i > 0) && (s3path == previous_path) if @overwrite log.warn "#{s3path} already exists, but will overwrite" break else @@ -225,22 +239,21 @@ hms_slicer = Time.now.strftime("%H%M%S") else hms_slicer = Time.now.utc.strftime("%H%M%S") end - path = extract_placeholders(@path, chunk.metadata) - @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key = { - "%{path}" => path, + "%{path}" => @path, "%{time_slice}" => time_slice, "%{file_extension}" => @compressor.ext, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key) + s3path = extract_placeholders(s3path, metadata) end tmp = Tempfile.new("s3-") tmp.binmode begin