lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.8.8 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-1.0.0.rc1

- old
+ new

@@ -1,191 +1,155 @@ require 'fluent/output' -require 'fluent/log-ext' +require 'aws-sdk-resources' +require 'zlib' +require 'time' +require 'tempfile' -module Fluent - class S3Output < Fluent::TimeSlicedOutput +module Fluent::Plugin + class S3Output < Fluent::Plugin::Output Fluent::Plugin.register_output('s3', self) + helpers :compat_parameters, :formatter, :inject + def initialize super - require 'aws-sdk-resources' - require 'zlib' - require 'time' - require 'tempfile' - @compressor = nil @uuid_flush_enabled = false end - # For fluentd v0.12.16 or earlier - class << self - unless method_defined?(:desc) - def desc(description) - end - end - end - unless Fluent::Config::ConfigureProxy.method_defined?(:desc) - Fluent::Config::ConfigureProxy.class_eval do - def desc(description) - end - end - end - desc "Path prefix of the files on S3" - config_param :path, :string, :default => "" + config_param :path, :string, default: "" desc "The Server-side encryption algorithm used when storing this object in S3 (AES256, aws:kms)" - config_param :use_server_side_encryption, :string, :default => nil + config_param :use_server_side_encryption, :string, default: nil desc "AWS access key id" - config_param :aws_key_id, :string, :default => nil, :secret => true + config_param :aws_key_id, :string, default: nil, secret: true desc "AWS secret key." - config_param :aws_sec_key, :string, :default => nil, :secret => true - config_section :assume_role_credentials, :multi => false do + config_param :aws_sec_key, :string, default: nil, secret: true + config_section :assume_role_credentials, multi: false do desc "The Amazon Resource Name (ARN) of the role to assume" - config_param :role_arn, :string, :secret => true + config_param :role_arn, :string, secret: true desc "An identifier for the assumed role session" config_param :role_session_name, :string desc "An IAM policy in JSON format" - config_param :policy, :string, :default => nil + config_param :policy, :string, default: nil desc "The duration, in seconds, of the role session (900-3600)" - config_param :duration_seconds, :integer, :default => nil + config_param :duration_seconds, :integer, default: nil desc "A unique identifier that is used by third parties when assuming roles in their customers' accounts." - config_param :external_id, :string, :default => nil, :secret => true + config_param :external_id, :string, default: nil, secret: true end - config_section :instance_profile_credentials, :multi => false do + config_section :instance_profile_credentials, multi: false do desc "Number of times to retry when retrieving credentials" - config_param :retries, :integer, :default => nil + config_param :retries, :integer, default: nil desc "IP address (default:169.254.169.254)" - config_param :ip_address, :string, :default => nil + config_param :ip_address, :string, default: nil desc "Port number (default:80)" - config_param :port, :integer, :default => nil + config_param :port, :integer, default: nil desc "Number of seconds to wait for the connection to open" - config_param :http_open_timeout, :float, :default => nil + config_param :http_open_timeout, :float, default: nil desc "Number of seconds to wait for one block to be read" - config_param :http_read_timeout, :float, :default => nil + config_param :http_read_timeout, :float, default: nil # config_param :delay, :integer or :proc, :default => nil # config_param :http_degub_output, :io, :default => nil end - config_section :shared_credentials, :multi => false do + config_section :shared_credentials, multi: false do desc "Path to the shared file. (default: $HOME/.aws/credentials)" - config_param :path, :string, :default => nil + config_param :path, :string, default: nil desc "Profile name. Default to 'default' or ENV['AWS_PROFILE']" - config_param :profile_name, :string, :default => nil + config_param :profile_name, :string, default: nil end desc "The number of attempts to load instance profile credentials from the EC2 metadata service using IAM role" - config_param :aws_iam_retries, :integer, :default => nil + config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead" desc "S3 bucket name" config_param :s3_bucket, :string desc "S3 region name" - config_param :s3_region, :string, :default => ENV["AWS_REGION"] || "us-east-1" + config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1" desc "Use 's3_region' instead" - config_param :s3_endpoint, :string, :default => nil - desc "If false, the certificate of endpoint will not be verified" - config_param :ssl_verify_peer, :bool, :default => true + config_param :s3_endpoint, :string, default: nil desc "The format of S3 object keys" - config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" + config_param :s3_object_key_format, :string, default: "%{path}%{time_slice}_%{index}.%{file_extension}" desc "If true, the bucket name is always left in the request URI and never moved to the host as a sub-domain" - config_param :force_path_style, :bool, :default => false + config_param :force_path_style, :bool, default: false desc "Archive format on S3" - config_param :store_as, :string, :default => "gzip" + config_param :store_as, :string, default: "gzip" desc "Create S3 bucket if it does not exists" - config_param :auto_create_bucket, :bool, :default => true + config_param :auto_create_bucket, :bool, default: true desc "Check AWS key on start" - config_param :check_apikey_on_start, :bool, :default => true + config_param :check_apikey_on_start, :bool, default: true desc "URI of proxy environment" - config_param :proxy_uri, :string, :default => nil + config_param :proxy_uri, :string, default: nil desc "Use S3 reduced redundancy storage for 33% cheaper pricing. Deprecated. Use storage_class instead" - config_param :reduced_redundancy, :bool, :default => false + config_param :reduced_redundancy, :bool, default: false, deprecated: "Use storage_class parameter instead." desc "The type of storage to use for the object(STANDARD,REDUCED_REDUNDANCY,STANDARD_IA)" - config_param :storage_class, :string, :default => "STANDARD" - desc "Change one line format in the S3 object (out_file,json,ltsv,single_value)" - config_param :format, :string, :default => 'out_file' + config_param :storage_class, :string, default: "STANDARD" desc "Permission for the object in S3" - config_param :acl, :string, :default => nil - desc "Allows grantee READ, READ_ACP, and WRITE_ACP permissions on the object" - config_param :grant_full_control, :string, :default => nil - desc "Allows grantee to read the object data and its metadata" - config_param :grant_read, :string, :default => nil - desc "Allows grantee to read the object ACL" - config_param :grant_read_acp, :string, :default => nil - desc "Allows grantee to write the ACL for the applicable object" - config_param :grant_write_acp, :string, :default => nil + config_param :acl, :string, default: nil desc "The length of `%{hex_random}` placeholder(4-16)" - config_param :hex_random_length, :integer, :default => 4 + config_param :hex_random_length, :integer, default: 4 desc "Overwrite already existing path" - config_param :overwrite, :bool, :default => false + config_param :overwrite, :bool, default: false desc "Check bucket if exists or not" - config_param :check_bucket, :bool, :default => true + config_param :check_bucket, :bool, default: true desc "Check object before creation" - config_param :check_object, :bool, :default => true + config_param :check_object, :bool, default: true desc "Specifies the AWS KMS key ID to use for object encryption" - config_param :ssekms_key_id, :string, :default => nil, :secret => true + config_param :ssekms_key_id, :string, default: nil, secret: true desc "Specifies the algorithm to use to when encrypting the object" - config_param :sse_customer_algorithm, :string, :default => nil + config_param :sse_customer_algorithm, :string, default: nil desc "Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data" - config_param :sse_customer_key, :string, :default => nil, :secret => true + config_param :sse_customer_key, :string, default: nil, secret: true desc "Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321" - config_param :sse_customer_key_md5, :string, :default => nil, :secret => true + config_param :sse_customer_key_md5, :string, default: nil, secret: true desc "AWS SDK uses MD5 for API request/response by default" - config_param :compute_checksums, :bool, :default => nil # use nil to follow SDK default configuration + config_param :compute_checksums, :bool, default: nil # use nil to follow SDK default configuration desc "Signature version for API Request (s3,v4)" - config_param :signature_version, :string, :default => nil # use nil to follow SDK default configuration + config_param :signature_version, :string, default: nil # use nil to follow SDK default configuration desc "Given a threshold to treat events as delay, output warning logs if delayed events were put into s3" - config_param :warn_for_delay, :time, :default => nil - desc "Directory for temporary files, instead of system temp directory." - config_param :tmp_dir, :string, :default => nil + config_param :warn_for_delay, :time, default: nil + DEFAULT_FORMAT_TYPE = "out_file" + + config_section :format do + config_set_default :@type, DEFAULT_FORMAT_TYPE + end + attr_reader :bucket MAX_HEX_RANDOM_LENGTH = 16 def configure(conf) + compat_parameters_convert(conf, :buffer, :formatter, :inject) + super if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com') - raise ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" + raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" end begin - @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log) + buffer_type = @buffer_config[:@type] + @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log) rescue - $log.warn "#{@store_as} not found. Use 'text' instead" + log.warn "#{@store_as} not found. Use 'text' instead" @compressor = TextCompressor.new end @compressor.configure(conf) - @formatter = Plugin.new_formatter(@format) - @formatter.configure(conf) + @formatter = formatter_create - if @localtime - @path_slicer = Proc.new {|path| - Time.now.strftime(path) - } - else - @path_slicer = Proc.new {|path| - Time.now.utc.strftime(path) - } - end - if @hex_random_length > MAX_HEX_RANDOM_LENGTH - raise ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" + raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" end if @reduced_redundancy - $log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead" + log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead" @storage_class = "REDUCED_REDUNDANCY" end - @path = process_deprecated_placeholders(@path) - @s3_object_key_format = process_deprecated_placeholders(@s3_object_key_format) - if !@check_object - if conf.has_key?('s3_object_key_format') - log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten." - else - log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{date_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format" - @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}" - end - end + @s3_object_key_format = process_s3_object_key_format + # For backward compatibility + # TODO: Remove time_slice_format when end of support compat_parameters + @configured_time_slice_format = conf['time_slice_format'] @values_for_s3_object_chunk = {} end def start options = setup_credentials @@ -193,52 +157,52 @@ options[:endpoint] = @s3_endpoint if @s3_endpoint options[:http_proxy] = @proxy_uri if @proxy_uri options[:force_path_style] = @force_path_style options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil? options[:signature_version] = @signature_version unless @signature_version.nil? - options[:ssl_verify_peer] = @ssl_verify_peer - log.on_trace do - options[:http_wire_trace] = true - options[:logger] = log - end s3_client = Aws::S3::Client.new(options) - @s3 = Aws::S3::Resource.new(:client => s3_client) + @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) check_apikeys if @check_apikey_on_start ensure_bucket if @check_bucket + if !@check_object + @s3_object_key_format = "%{path}/%{date_slice}_%{hms_slice}.%{file_extension}" + end + super end def format(tag, time, record) - @formatter.format(tag, time, record) + r = inject_values_to_record(tag, time, record) + @formatter.format(tag, time, r) end def write(chunk) i = 0 previous_path = nil + time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']) + time_slice = Time.at(chunk.metadata.timekey).utc.strftime(time_slice_format) if @check_object begin - path = @path_slicer.call(@path) + path = extract_placeholders(@path, chunk.metadata) @values_for_s3_object_chunk[chunk.unique_id] ||= { - "hex_random" => hex_random(chunk), + "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key = { - "path" => path, - "time_slice" => chunk.key, - "file_extension" => @compressor.ext, - "index" => i, + "%{path}" => path, + "%{time_slice}" => time_slice, + "%{file_extension}" => @compressor.ext, + "%{index}" => i, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) - values_for_s3_object_key['uuid_flush'.freeze] = uuid_random if @uuid_flush_enabled + values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled - s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| - values_for_s3_object_key[expr[2...expr.size-1]] - } + s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key) if (i > 0) && (s3path == previous_path) if @overwrite log.warn "#{s3path} already exists, but will overwrite" break else @@ -254,95 +218,99 @@ hms_slicer = Time.now.strftime("%H%M%S") else hms_slicer = Time.now.utc.strftime("%H%M%S") end - values_for_s3_object_key = { - "path" => @path_slicer.call(@path), - "time_slice" => chunk.key, - "date_slice" => chunk.key, - "file_extension" => @compressor.ext, - "hms_slice" => hms_slicer, + path = extract_placeholders(@path, chunk.metadata) + + @values_for_s3_object_chunk[chunk.unique_id] ||= { + "%{hex_random}" => hex_random(chunk), } - s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| - values_for_s3_object_key[expr[2...expr.size-1]] - } + values_for_s3_object_key = { + "%{path}" => path, + "%{time_slice}" => time_slice, + "%{file_extension}" => @compressor.ext, + }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) + values_for_s3_object_key["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled + + s3path = @s3_object_key_format.gsub(%r(%{[^}]+}), values_for_s3_object_key) end - tmp = Tempfile.new("s3-", @tmp_dir) + tmp = Tempfile.new("s3-") tmp.binmode begin @compressor.compress(chunk, tmp) tmp.rewind - log.debug { "out_s3: write chunk: {key:#{chunk.key},unique_id:#{unique_hex(chunk)}} to s3://#{@s3_bucket}/#{s3path}" } + log.debug { "out_s3: write chunk: {key:#{chunk.key},tsuffix:#{tsuffix(chunk)}} to s3://#{@s3_bucket}/#{s3path}" } put_options = { - :body => tmp, - :content_type => @compressor.content_type, - :storage_class => @storage_class, + body: tmp, + content_type: @compressor.content_type, + storage_class: @storage_class, } put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5 put_options[:acl] = @acl if @acl - put_options[:grant_full_control] = @grant_full_control if @grant_full_control - put_options[:grant_read] = @grant_read if @grant_read - put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp - put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp @bucket.object(s3path).put(put_options) @values_for_s3_object_chunk.delete(chunk.unique_id) if @warn_for_delay - if Time.strptime(chunk.key, @time_slice_format) < Time.now - @warn_for_delay + if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay log.warn { "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" } end end ensure tmp.close(true) rescue nil end end private - # v0.14 has a useful Fluent::UniqueId.hex(unique_id) method, though - def unique_hex(chunk) - unique_id = chunk.unique_id - unique_id.unpack('C*').map {|x| x.to_s(16) }.join('') - end - def hex_random(chunk) - unique_hex = unique_hex(chunk) + unique_hex = Fluent::UniqueId.hex(chunk.unique_id) unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness unique_hex[0...@hex_random_length] end def uuid_random ::UUIDTools::UUID.random_create.to_s end + # This is stolen from Fluentd + def timekey_to_timeformat(timekey) + case timekey + when nil then '' + when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive + when 60...3600 then '%Y%m%d%H%M' + when 3600...86400 then '%Y%m%d%H' + else '%Y%m%d' + end + end + def ensure_bucket if !@bucket.exists? if @auto_create_bucket log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" - @s3.create_bucket(:bucket => @s3_bucket) + @s3.create_bucket(bucket: @s3_bucket) else raise "The specified bucket does not exist: bucket = #{@s3_bucket}" end end end - def process_deprecated_placeholders(target_path) + def process_s3_object_key_format %W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph| - if target_path.include?(ph) - raise ConfigError, %!#{ph} placeholder is removed! + if @s3_object_key_format.include?(ph) + raise ConfigError, %!#{ph} placeholder in s3_object_key_format is removed! end } - if target_path.include?('%{uuid_flush}') + if @s3_object_key_format.include?('%{uuid_flush}') # test uuidtools works or not begin require 'uuidtools' rescue LoadError raise ConfigError, "uuidtools gem not found. Install uuidtools gem first" @@ -353,22 +321,22 @@ raise ConfigError, "Generating uuid doesn't work. Can't use %{uuid_flush} on this environment. #{e}" end @uuid_flush_enabled = true end - target_path.gsub('%{hostname}') { |expr| + @s3_object_key_format.gsub('%{hostname}') { |expr| log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead" Socket.gethostname } end def check_apikeys - @bucket.objects(:prefix => @path, :max_keys => 1).first + @bucket.objects(prefix: @path).first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e - raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}" + raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration. error = #{e.inspect}" end def setup_credentials options = {} credentials_options = {} @@ -382,11 +350,11 @@ credentials_options[:role_session_name] = c.role_session_name credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds credentials_options[:external_id] = c.external_id if c.external_id if @s3_region - credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region) + credentials_options[:client] = Aws::STS::Client.new(region: @s3_region) end options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) when @instance_profile_credentials c = @instance_profile_credentials credentials_options[:retries] = c.retries if c.retries @@ -403,11 +371,11 @@ c = @shared_credentials credentials_options[:path] = c.path if c.path credentials_options[:profile_name] = c.profile_name if c.profile_name options[:credentials] = Aws::SharedCredentials.new(credentials_options) when @aws_iam_retries - $log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") + log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") credentials_options[:retries] = @aws_iam_retries if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) @@ -418,14 +386,12 @@ end options end class Compressor - include Configurable + include Fluent::Configurable - config_param :tmp_dir, :string, :default => nil - def initialize(opts = {}) super() @buffer_type = opts[:buffer_type] @log = opts[:log] end @@ -452,11 +418,11 @@ algo = command if algo.nil? begin Open3.capture3("#{command} -V") rescue Errno::ENOENT - raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression" + raise Fluent::ConfigError, "'#{command}' utility must be in PATH for #{algo} compression" end end end class GzipCompressor < Compressor @@ -499,10 +465,10 @@ def content_type 'application/json'.freeze end end - COMPRESSOR_REGISTRY = Registry.new(:s3_compressor_type, 'fluent/plugin/s3_compressor_') + COMPRESSOR_REGISTRY = Fluent::Registry.new(:s3_compressor_type, 'fluent/plugin/s3_compressor_') { 'gzip' => GzipCompressor, 'json' => JsonCompressor, 'text' => TextCompressor }.each { |name, compressor|