lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.7.2 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.0.rc1
- old
+ new
@@ -1,10 +1,8 @@
require 'fluent/output'
module Fluent
- require 'fluent/mixin/config_placeholders'
-
class S3Output < Fluent::TimeSlicedOutput
Fluent::Plugin.register_output('s3', self)
def initialize
super
@@ -12,10 +10,11 @@
require 'zlib'
require 'time'
require 'tempfile'
@compressor = nil
+ @uuid_flush_enabled = false
end
# For fluentd v0.12.16 or earlier
class << self
unless method_defined?(:desc)
@@ -117,17 +116,12 @@
desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3"
config_param :warn_for_delay, :time, :default => nil
attr_reader :bucket
- include Fluent::Mixin::ConfigPlaceholders
MAX_HEX_RANDOM_LENGTH = 16
- def placeholders
- [:percent]
- end
-
def configure(conf)
super
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
@@ -160,10 +154,12 @@
if @reduced_redundancy
$log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
@storage_class = "REDUCED_REDUNDANCY"
end
+
+ @s3_object_key_format = process_s3_object_key_format
@values_for_s3_object_chunk = {}
end
def start
options = setup_credentials
@@ -201,12 +197,12 @@
values_for_s3_object_key = {
"path" => path,
"time_slice" => chunk.key,
"file_extension" => @compressor.ext,
"index" => i,
- "uuid_flush" => uuid_random,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
+ values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
if (i > 0) && (s3path == previous_path)
@@ -266,18 +262,50 @@
unique_hex = unique_hex(chunk)
unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness
unique_hex[0...@hex_random_length]
end
+ def uuid_random
+ ::UUIDTools::UUID.random_create.to_s
+ end
+
def ensure_bucket
if !@bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
@s3.create_bucket(:bucket => @s3_bucket)
else
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
end
end
+ end
+
+ def process_s3_object_key_format
+ %W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph|
+ if @s3_object_key_format.include?(ph)
+ raise ConfigError, %!#{ph} placeholder in s3_object_key_format is removed!
+ end
+ }
+
+ if @s3_object_key_format.include?('%{uuid_flush}')
+ # test uuidtools works or not
+ begin
+ require 'uuidtools'
+ rescue LoadError
+ raise ConfigError, "uuidtools gem not found. Install uuidtools gem first"
+ end
+ begin
+ uuid_random
+ rescue => e
+ raise ConfigError, "Generating uuid doesn't work. Can't use %{uuid_flush} on this environment. #{e}"
+ end
+ @uuid_flush_enabled = true
+ end
+
+ @s3_object_key_format.gsub('%{hostname}') { |expr|
+ log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead"
+ Socket.gethostname
+ }
end
def check_apikeys
@bucket.objects(:prefix => @path).first
rescue Aws::S3::Errors::NoSuchBucket