lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.7.2 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.0.rc1

- old
+ new

@@ -1,10 +1,8 @@ require 'fluent/output' module Fluent - require 'fluent/mixin/config_placeholders' - class S3Output < Fluent::TimeSlicedOutput Fluent::Plugin.register_output('s3', self) def initialize super @@ -12,10 +10,11 @@ require 'zlib' require 'time' require 'tempfile' @compressor = nil + @uuid_flush_enabled = false end # For fluentd v0.12.16 or earlier class << self unless method_defined?(:desc) @@ -117,17 +116,12 @@ desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3" config_param :warn_for_delay, :time, :default => nil attr_reader :bucket - include Fluent::Mixin::ConfigPlaceholders MAX_HEX_RANDOM_LENGTH = 16 - def placeholders - [:percent] - end - def configure(conf) super if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com') raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" @@ -160,10 +154,12 @@ if @reduced_redundancy $log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead" @storage_class = "REDUCED_REDUNDANCY" end + + @s3_object_key_format = process_s3_object_key_format @values_for_s3_object_chunk = {} end def start options = setup_credentials @@ -201,12 +197,12 @@ values_for_s3_object_key = { "path" => path, "time_slice" => chunk.key, "file_extension" => @compressor.ext, "index" => i, - "uuid_flush" => uuid_random, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) + values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| values_for_s3_object_key[expr[2...expr.size-1]] } if (i > 0) && (s3path == previous_path) @@ -266,18 +262,50 @@ unique_hex = unique_hex(chunk) unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness unique_hex[0...@hex_random_length] end + def uuid_random + ::UUIDTools::UUID.random_create.to_s + end + def ensure_bucket if !@bucket.exists? if @auto_create_bucket log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" @s3.create_bucket(:bucket => @s3_bucket) else raise "The specified bucket does not exist: bucket = #{@s3_bucket}" end end + end + + def process_s3_object_key_format + %W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph| + if @s3_object_key_format.include?(ph) + raise ConfigError, %!#{ph} placeholder in s3_object_key_format is removed! + end + } + + if @s3_object_key_format.include?('%{uuid_flush}') + # test uuidtools works or not + begin + require 'uuidtools' + rescue LoadError + raise ConfigError, "uuidtools gem not found. Install uuidtools gem first" + end + begin + uuid_random + rescue => e + raise ConfigError, "Generating uuid doesn't work. Can't use %{uuid_flush} on this environment. #{e}" + end + @uuid_flush_enabled = true + end + + @s3_object_key_format.gsub('%{hostname}') { |expr| + log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead" + Socket.gethostname + } end def check_apikeys @bucket.objects(:prefix => @path).first rescue Aws::S3::Errors::NoSuchBucket