lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.8 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc1
- old
+ new
@@ -1,191 +1,155 @@
require 'fluent/output'
-require 'fluent/log-ext'
+require 'aws-sdk-resources'
+require 'zlib'
+require 'time'
+require 'tempfile'
-module Fluent
- class S3Output < Fluent::TimeSlicedOutput
+module Fluent::Plugin
+ class S3Output < Fluent::Plugin::Output
Fluent::Plugin.register_output('s3', self)
+ helpers :compat_parameters, :formatter, :inject
+
def initialize
super
- require 'aws-sdk-resources'
- require 'zlib'
- require 'time'
- require 'tempfile'
-
@compressor = nil
@uuid_flush_enabled = false
end
- # For fluentd v0.12.16 or earlier
- class << self
- unless method_defined?(:desc)
- def desc(description)
- end
- end
- end
- unless Fluent::Config::ConfigureProxy.method_defined?(:desc)
- Fluent::Config::ConfigureProxy.class_eval do
- def desc(description)
- end
- end
- end
-
desc "Path prefix of the files on S3"
- config_param :path, :string, :default => ""
+ config_param :path, :string, default: ""
desc "The Server-side encryption algorithm used when storing this object in S3 (AES256, aws:kms)"
- config_param :use_server_side_encryption, :string, :default => nil
+ config_param :use_server_side_encryption, :string, default: nil
desc "AWS access key id"
- config_param :aws_key_id, :string, :default => nil, :secret => true
+ config_param :aws_key_id, :string, default: nil, secret: true
desc "AWS secret key."
- config_param :aws_sec_key, :string, :default => nil, :secret => true
- config_section :assume_role_credentials, :multi => false do
+ config_param :aws_sec_key, :string, default: nil, secret: true
+ config_section :assume_role_credentials, multi: false do
desc "The Amazon Resource Name (ARN) of the role to assume"
- config_param :role_arn, :string, :secret => true
+ config_param :role_arn, :string, secret: true
desc "An identifier for the assumed role session"
config_param :role_session_name, :string
desc "An IAM policy in JSON format"
- config_param :policy, :string, :default => nil
+ config_param :policy, :string, default: nil
desc "The duration, in seconds, of the role session (900-3600)"
- config_param :duration_seconds, :integer, :default => nil
+ config_param :duration_seconds, :integer, default: nil
desc "A unique identifier that is used by third parties when assuming roles in their customers' accounts."
- config_param :external_id, :string, :default => nil, :secret => true
+ config_param :external_id, :string, default: nil, secret: true
end
- config_section :instance_profile_credentials, :multi => false do
+ config_section :instance_profile_credentials, multi: false do
desc "Number of times to retry when retrieving credentials"
- config_param :retries, :integer, :default => nil
+ config_param :retries, :integer, default: nil
desc "IP address (default:169.254.169.254)"
- config_param :ip_address, :string, :default => nil
+ config_param :ip_address, :string, default: nil
desc "Port number (default:80)"
- config_param :port, :integer, :default => nil
+ config_param :port, :integer, default: nil
desc "Number of seconds to wait for the connection to open"
- config_param :http_open_timeout, :float, :default => nil
+ config_param :http_open_timeout, :float, default: nil
desc "Number of seconds to wait for one block to be read"
- config_param :http_read_timeout, :float, :default => nil
+ config_param :http_read_timeout, :float, default: nil
# config_param :delay, :integer or :proc, :default => nil
# config_param :http_degub_output, :io, :default => nil
end
- config_section :shared_credentials, :multi => false do
+ config_section :shared_credentials, multi: false do
desc "Path to the shared file. (default: $HOME/.aws/credentials)"
- config_param :path, :string, :default => nil
+ config_param :path, :string, default: nil
desc "Profile name. Default to 'default' or ENV['AWS_PROFILE']"
- config_param :profile_name, :string, :default => nil
+ config_param :profile_name, :string, default: nil
end
desc "The number of attempts to load instance profile credentials from the EC2 metadata service using IAM role"
- config_param :aws_iam_retries, :integer, :default => nil
+ config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
desc "S3 bucket name"
config_param :s3_bucket, :string
desc "S3 region name"
- config_param :s3_region, :string, :default => ENV["AWS_REGION"] || "us-east-1"
+ config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
desc "Use 's3_region' instead"
- config_param :s3_endpoint, :string, :default => nil
- desc "If false, the certificate of endpoint will not be verified"
- config_param :ssl_verify_peer, :bool, :default => true
+ config_param :s3_endpoint, :string, default: nil
desc "The format of S3 object keys"
- config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}"
+ config_param :s3_object_key_format, :string, default: "%{path}%{time_slice}_%{index}.%{file_extension}"
desc "If true, the bucket name is always left in the request URI and never moved to the host as a sub-domain"
- config_param :force_path_style, :bool, :default => false
+ config_param :force_path_style, :bool, default: false
desc "Archive format on S3"
- config_param :store_as, :string, :default => "gzip"
+ config_param :store_as, :string, default: "gzip"
desc "Create S3 bucket if it does not exists"
- config_param :auto_create_bucket, :bool, :default => true
+ config_param :auto_create_bucket, :bool, default: true
desc "Check AWS key on start"
- config_param :check_apikey_on_start, :bool, :default => true
+ config_param :check_apikey_on_start, :bool, default: true
desc "URI of proxy environment"
- config_param :proxy_uri, :string, :default => nil
+ config_param :proxy_uri, :string, default: nil
desc "Use S3 reduced redundancy storage for 33% cheaper pricing. Deprecated. Use storage_class instead"
- config_param :reduced_redundancy, :bool, :default => false
+ config_param :reduced_redundancy, :bool, default: false, deprecated: "Use storage_class parameter instead."
desc "The type of storage to use for the object(STANDARD,REDUCED_REDUNDANCY,STANDARD_IA)"
- config_param :storage_class, :string, :default => "STANDARD"
- desc "Change one line format in the S3 object (out_file,json,ltsv,single_value)"
- config_param :format, :string, :default => 'out_file'
+ config_param :storage_class, :string, default: "STANDARD"
desc "Permission for the object in S3"
- config_param :acl, :string, :default => nil
- desc "Allows grantee READ, READ_ACP, and WRITE_ACP permissions on the object"
- config_param :grant_full_control, :string, :default => nil
- desc "Allows grantee to read the object data and its metadata"
- config_param :grant_read, :string, :default => nil
- desc "Allows grantee to read the object ACL"
- config_param :grant_read_acp, :string, :default => nil
- desc "Allows grantee to write the ACL for the applicable object"
- config_param :grant_write_acp, :string, :default => nil
+ config_param :acl, :string, default: nil
desc "The length of `%{hex_random}` placeholder(4-16)"
- config_param :hex_random_length, :integer, :default => 4
+ config_param :hex_random_length, :integer, default: 4
desc "Overwrite already existing path"
- config_param :overwrite, :bool, :default => false
+ config_param :overwrite, :bool, default: false
desc "Check bucket if exists or not"
- config_param :check_bucket, :bool, :default => true
+ config_param :check_bucket, :bool, default: true
desc "Check object before creation"
- config_param :check_object, :bool, :default => true
+ config_param :check_object, :bool, default: true
desc "Specifies the AWS KMS key ID to use for object encryption"
- config_param :ssekms_key_id, :string, :default => nil, :secret => true
+ config_param :ssekms_key_id, :string, default: nil, secret: true
desc "Specifies the algorithm to use to when encrypting the object"
- config_param :sse_customer_algorithm, :string, :default => nil
+ config_param :sse_customer_algorithm, :string, default: nil
desc "Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data"
- config_param :sse_customer_key, :string, :default => nil, :secret => true
+ config_param :sse_customer_key, :string, default: nil, secret: true
desc "Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321"
- config_param :sse_customer_key_md5, :string, :default => nil, :secret => true
+ config_param :sse_customer_key_md5, :string, default: nil, secret: true
desc "AWS SDK uses MD5 for API request/response by default"
- config_param :compute_checksums, :bool, :default => nil # use nil to follow SDK default configuration
+ config_param :compute_checksums, :bool, default: nil # use nil to follow SDK default configuration
desc "Signature version for API Request (s3,v4)"
- config_param :signature_version, :string, :default => nil # use nil to follow SDK default configuration
+ config_param :signature_version, :string, default: nil # use nil to follow SDK default configuration
desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3"
- config_param :warn_for_delay, :time, :default => nil
- desc "Directory for temporary files, instead of system temp directory."
- config_param :tmp_dir, :string, :default => nil
+ config_param :warn_for_delay, :time, default: nil
+ DEFAULT_FORMAT_TYPE = "out_file"
+
+ config_section :format do
+ config_set_default :@type, DEFAULT_FORMAT_TYPE
+ end
+
attr_reader :bucket
MAX_HEX_RANDOM_LENGTH = 16
def configure(conf)
+ compat_parameters_convert(conf, :buffer, :formatter, :inject)
+
super
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
- raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
+ raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
end
begin
- @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
+ buffer_type = @buffer_config[:@type]
+ @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
rescue
- $log.warn "#{@store_as} not found. Use 'text' instead"
+ log.warn "#{@store_as} not found. Use 'text' instead"
@compressor = TextCompressor.new
end
@compressor.configure(conf)
- @formatter = Plugin.new_formatter(@format)
- @formatter.configure(conf)
+ @formatter = formatter_create
- if @localtime
- @path_slicer = Proc.new {|path|
- Time.now.strftime(path)
- }
- else
- @path_slicer = Proc.new {|path|
- Time.now.utc.strftime(path)
- }
- end
-
if @hex_random_length > MAX_HEX_RANDOM_LENGTH
- raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
+ raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
end
if @reduced_redundancy
- $log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
+ log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
@storage_class = "REDUCED_REDUNDANCY"
end
- @path = process_deprecated_placeholders(@path)
- @s3_object_key_format = process_deprecated_placeholders(@s3_object_key_format)
- if !@check_object
- if conf.has_key?('s3_object_key_format')
- log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten."
- else
- log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{date_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format"
- @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
- end
- end
+ @s3_object_key_format = process_s3_object_key_format
+ # For backward compatibility
+ # TODO: Remove time_slice_format when end of support compat_parameters
+ @configured_time_slice_format = conf['time_slice_format']
@values_for_s3_object_chunk = {}
end
def start
options = setup_credentials
@@ -193,52 +157,52 @@
options[:endpoint] = @s3_endpoint if @s3_endpoint
options[:http_proxy] = @proxy_uri if @proxy_uri
options[:force_path_style] = @force_path_style
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
options[:signature_version] = @signature_version unless @signature_version.nil?
- options[:ssl_verify_peer] = @ssl_verify_peer
- log.on_trace do
- options[:http_wire_trace] = true
- options[:logger] = log
- end
s3_client = Aws::S3::Client.new(options)
- @s3 = Aws::S3::Resource.new(:client => s3_client)
+ @s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
+ if !@check_object
+ @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}"
+ end
+
super
end
def format(tag, time, record)
- @formatter.format(tag, time, record)
+ r = inject_values_to_record(tag, time, record)
+ @formatter.format(tag, time, r)
end
def write(chunk)
i = 0
previous_path = nil
+ time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])
+ time_slice = Time.at(chunk.metadata.timekey).utc.strftime(time_slice_format)
if @check_object
begin
- path = @path_slicer.call(@path)
+ path = extract_placeholders(@path, chunk.metadata)
@values_for_s3_object_chunk[chunk.unique_id] ||= {
- "hex_random" => hex_random(chunk),
+ "%{hex_random}" => hex_random(chunk),
}
values_for_s3_object_key = {
- "path" => path,
- "time_slice" => chunk.key,
- "file_extension" => @compressor.ext,
- "index" => i,
+ "%{path}" => path,
+ "%{time_slice}" => time_slice,
+ "%{file_extension}" => @compressor.ext,
+ "%{index}" => i,
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
- values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled
+ values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
- s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
- values_for_s3_object_key[expr[2...expr.size-1]]
- }
+ s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key)
if (i > 0) && (s3path == previous_path)
if @overwrite
log.warn "#{s3path} already exists, but will overwrite"
break
else
@@ -254,95 +218,99 @@
hms_slicer = Time.now.strftime("%H%M%S")
else
hms_slicer = Time.now.utc.strftime("%H%M%S")
end
- values_for_s3_object_key = {
- "path" => @path_slicer.call(@path),
- "time_slice" => chunk.key,
- "date_slice" => chunk.key,
- "file_extension" => @compressor.ext,
- "hms_slice" => hms_slicer,
+ path = extract_placeholders(@path, chunk.metadata)
+
+ @values_for_s3_object_chunk[chunk.unique_id] ||= {
+ "%{hex_random}" => hex_random(chunk),
}
- s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
- values_for_s3_object_key[expr[2...expr.size-1]]
- }
+ values_for_s3_object_key = {
+ "%{path}" => path,
+ "%{time_slice}" => time_slice,
+ "%{file_extension}" => @compressor.ext,
+ }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
+ values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
+
+ s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key)
end
- tmp = Tempfile.new("s3-", @tmp_dir)
+ tmp = Tempfile.new("s3-")
tmp.binmode
begin
@compressor.compress(chunk, tmp)
tmp.rewind
- log.debug { "out_s3: write chunk: {key:#{chunk.key},unique_id:#{unique_hex(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
+ log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" }
put_options = {
- :body => tmp,
- :content_type => @compressor.content_type,
- :storage_class => @storage_class,
+ body: tmp,
+ content_type: @compressor.content_type,
+ storage_class: @storage_class,
}
put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
put_options[:acl] = @acl if @acl
- put_options[:grant_full_control] = @grant_full_control if @grant_full_control
- put_options[:grant_read] = @grant_read if @grant_read
- put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp
- put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp
@bucket.object(s3path).put(put_options)
@values_for_s3_object_chunk.delete(chunk.unique_id)
if @warn_for_delay
- if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay
+ if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" }
end
end
ensure
tmp.close(true) rescue nil
end
end
private
- # v0.14 has a useful Fluent::UniqueId.hex(unique_id) method, though
- def unique_hex(chunk)
- unique_id = chunk.unique_id
- unique_id.unpack('C*').map {|x| x.to_s(16) }.join('')
- end
-
def hex_random(chunk)
- unique_hex = unique_hex(chunk)
+ unique_hex = Fluent::UniqueId.hex(chunk.unique_id)
unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness
unique_hex[0...@hex_random_length]
end
def uuid_random
::UUIDTools::UUID.random_create.to_s
end
+ # This is stolen from Fluentd
+ def timekey_to_timeformat(timekey)
+ case timekey
+ when nil then ''
+ when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive
+ when 60...3600 then '%Y%m%d%H%M'
+ when 3600...86400 then '%Y%m%d%H'
+ else '%Y%m%d'
+ end
+ end
+
def ensure_bucket
if !@bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
- @s3.create_bucket(:bucket => @s3_bucket)
+ @s3.create_bucket(bucket: @s3_bucket)
else
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
end
end
end
- def process_deprecated_placeholders(target_path)
+ def process_s3_object_key_format
%W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph|
- if target_path.include?(ph)
- raise ConfigError, %!#{ph} placeholder is removed!
+ if @s3_object_key_format.include?(ph)
+ raise ConfigError, %!#{ph} placeholder in s3_object_key_format is removed!
end
}
- if target_path.include?('%{uuid_flush}')
+ if @s3_object_key_format.include?('%{uuid_flush}')
# test uuidtools works or not
begin
require 'uuidtools'
rescue LoadError
raise ConfigError, "uuidtools gem not found. Install uuidtools gem first"
@@ -353,22 +321,22 @@
raise ConfigError, "Generating uuid doesn't work. Can't use %{uuid_flush} on this environment. #{e}"
end
@uuid_flush_enabled = true
end
- target_path.gsub('%{hostname}') { |expr|
+ @s3_object_key_format.gsub('%{hostname}') { |expr|
log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead"
Socket.gethostname
}
end
def check_apikeys
- @bucket.objects(:prefix => @path, :max_keys => 1).first
+ @bucket.objects(prefix: @path).first
rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
- raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
+ raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}"
end
def setup_credentials
options = {}
credentials_options = {}
@@ -382,11 +350,11 @@
credentials_options[:role_session_name] = c.role_session_name
credentials_options[:policy] = c.policy if c.policy
credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
credentials_options[:external_id] = c.external_id if c.external_id
if @s3_region
- credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region)
+ credentials_options[:client] = Aws::STS::Client.new(region: @s3_region)
end
options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
when @instance_profile_credentials
c = @instance_profile_credentials
credentials_options[:retries] = c.retries if c.retries
@@ -403,11 +371,11 @@
c = @shared_credentials
credentials_options[:path] = c.path if c.path
credentials_options[:profile_name] = c.profile_name if c.profile_name
options[:credentials] = Aws::SharedCredentials.new(credentials_options)
when @aws_iam_retries
- $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
+ log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
credentials_options[:retries] = @aws_iam_retries
if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
options[:credentials] = Aws::ECSCredentials.new(credentials_options)
else
options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
@@ -418,14 +386,12 @@
end
options
end
class Compressor
- include Configurable
+ include Fluent::Configurable
- config_param :tmp_dir, :string, :default => nil
-
def initialize(opts = {})
super()
@buffer_type = opts[:buffer_type]
@log = opts[:log]
end
@@ -452,11 +418,11 @@
algo = command if algo.nil?
begin
Open3.capture3("#{command} -V")
rescue Errno::ENOENT
- raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression"
+ raise Fluent::ConfigError, "'#{command}' utility must be in PATH for #{algo} compression"
end
end
end
class GzipCompressor < Compressor
@@ -499,10 +465,10 @@
def content_type
'application/json'.freeze
end
end
- COMPRESSOR_REGISTRY = Registry.new(:s3_compressor_type, 'fluent/plugin/s3_compressor_')
+ COMPRESSOR_REGISTRY = Fluent::Registry.new(:s3_compressor_type, 'fluent/plugin/s3_compressor_')
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor
}.each { |name, compressor|