lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.3.7 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.4.0
- old
+ new
@@ -1,219 +1,193 @@
module Fluent
+ require 'fluent/mixin/config_placeholders'
-require 'fluent/mixin/config_placeholders'
+ class S3Output < Fluent::TimeSlicedOutput
+ Fluent::Plugin.register_output('s3', self)
-class S3Output < Fluent::TimeSlicedOutput
- Fluent::Plugin.register_output('s3', self)
+ unless method_defined?(:log)
+ define_method(:log) { $log }
+ end
- unless method_defined?(:log)
- define_method(:log) { $log }
- end
+ def initialize
+ super
+ require 'aws-sdk'
+ require 'zlib'
+ require 'time'
+ require 'tempfile'
+ require 'open3'
- def initialize
- super
- require 'aws-sdk'
- require 'zlib'
- require 'time'
- require 'tempfile'
- require 'open3'
+ @use_ssl = true
+ end
- @use_ssl = true
- end
+ config_param :path, :string, :default => ""
- config_param :path, :string, :default => ""
- config_param :time_format, :string, :default => nil
+ config_param :aws_key_id, :string, :default => nil
+ config_param :aws_sec_key, :string, :default => nil
+ config_param :s3_bucket, :string
+ config_param :s3_endpoint, :string, :default => nil
+ config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}"
+ config_param :store_as, :string, :default => "gzip"
+ config_param :command_parameter, :string, :default => nil
+ config_param :auto_create_bucket, :bool, :default => true
+ config_param :check_apikey_on_start, :bool, :default => true
+ config_param :proxy_uri, :string, :default => nil
+ config_param :reduced_redundancy, :bool, :default => false
+ config_param :format, :string, :default => 'out_file'
- include SetTagKeyMixin
- config_set_default :include_tag_key, false
+ attr_reader :bucket
- include SetTimeKeyMixin
- config_set_default :include_time_key, false
+ include Fluent::Mixin::ConfigPlaceholders
- config_param :aws_key_id, :string, :default => nil
- config_param :aws_sec_key, :string, :default => nil
- config_param :s3_bucket, :string
- config_param :s3_endpoint, :string, :default => nil
- config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}"
- config_param :store_as, :string, :default => "gzip"
- config_param :command_parameter, :string, :default => nil
- config_param :auto_create_bucket, :bool, :default => true
- config_param :check_apikey_on_start, :bool, :default => true
- config_param :proxy_uri, :string, :default => nil
- config_param :reduced_redundancy, :bool, :default => false
+ def placeholders
+ [:percent]
+ end
- attr_reader :bucket
+ def configure(conf)
+ super
- include Fluent::Mixin::ConfigPlaceholders
+ if use_ssl = conf['use_ssl']
+ if use_ssl.empty?
+ @use_ssl = true
+ else
+ @use_ssl = Config.bool_value(use_ssl)
+ if @use_ssl.nil?
+ raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output"
+ end
+ end
+ end
- def placeholders
- [:percent]
- end
+ @ext, @mime_type = case @store_as
+ when 'gzip'
+ ['gz', 'application/x-gzip']
+ when 'lzo'
+ check_command('lzop', 'LZO')
+ @command_parameter = '-qf1' if @command_parameter.nil?
+ ['lzo', 'application/x-lzop']
+ when 'lzma2'
+ check_command('xz', 'LZMA2')
+ @command_parameter = '-qf0' if @command_parameter.nil?
+ ['xz', 'application/x-xz']
+ when 'json'
+ ['json', 'application/json']
+ else
+ ['txt', 'text/plain']
+ end
- def configure(conf)
- super
+ if format_json = conf['format_json']
+ $log.warn "format_json is deprecated. Use 'format json' instead"
+ conf['format'] = 'json'
+ else
+ conf['format'] = @format
+ end
+ @formatter = TextFormatter.create(conf)
- if format_json = conf['format_json']
- @format_json = true
- else
- @format_json = false
- end
-
- if use_ssl = conf['use_ssl']
- if use_ssl.empty?
- @use_ssl = true
+ if @localtime
+ @path_slicer = Proc.new {|path|
+ Time.now.strftime(path)
+ }
else
- @use_ssl = Config.bool_value(use_ssl)
- if @use_ssl.nil?
- raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output"
- end
+ @path_slicer = Proc.new {|path|
+ Time.now.utc.strftime(path)
+ }
end
end
- @ext, @mime_type = case @store_as
- when 'gzip'
- ['gz', 'application/x-gzip']
- when 'lzo'
- check_command('lzop', 'LZO')
- @command_parameter = '-qf1' if @command_parameter.nil?
- ['lzo', 'application/x-lzop']
- when 'lzma2'
- check_command('xz', 'LZMA2')
- @command_parameter = '-qf0' if @command_parameter.nil?
- ['xz', 'application/x-xz']
- when 'json'
- ['json', 'application/json']
- else
- ['txt', 'text/plain']
- end
+ def start
+ super
+ options = {}
+ if @aws_key_id && @aws_sec_key
+ options[:access_key_id] = @aws_key_id
+ options[:secret_access_key] = @aws_sec_key
+ end
+ options[:s3_endpoint] = @s3_endpoint if @s3_endpoint
+ options[:proxy_uri] = @proxy_uri if @proxy_uri
+ options[:use_ssl] = @use_ssl
- @timef = TimeFormatter.new(@time_format, @localtime)
+ @s3 = AWS::S3.new(options)
+ @bucket = @s3.buckets[@s3_bucket]
- if @localtime
- @path_slicer = Proc.new {|path|
- Time.now.strftime(path)
- }
- else
- @path_slicer = Proc.new {|path|
- Time.now.utc.strftime(path)
- }
+ check_apikeys if @check_apikey_on_start
+ ensure_bucket
end
- end
-
- def start
- super
- options = {}
- if @aws_key_id && @aws_sec_key
- options[:access_key_id] = @aws_key_id
- options[:secret_access_key] = @aws_sec_key
+ def format(tag, time, record)
+ @formatter.format(tag, time, record)
end
- options[:s3_endpoint] = @s3_endpoint if @s3_endpoint
- options[:proxy_uri] = @proxy_uri if @proxy_uri
- options[:use_ssl] = @use_ssl
- @s3 = AWS::S3.new(options)
- @bucket = @s3.buckets[@s3_bucket]
+ def write(chunk)
+ i = 0
- check_apikeys if @check_apikey_on_start
- ensure_bucket
- end
+ begin
+ path = @path_slicer.call(@path)
+ values_for_s3_object_key = {
+ "path" => path,
+ "time_slice" => chunk.key,
+ "file_extension" => @ext,
+ "index" => i
+ }
+ s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
+ values_for_s3_object_key[expr[2...expr.size-1]]
+ }
+ i += 1
+ end while @bucket.objects[s3path].exists?
- def format(tag, time, record)
- if @include_time_key || !@format_json
- time_str = @timef.format(time)
+ tmp = Tempfile.new("s3-")
+ begin
+ if @store_as == "gzip"
+ w = Zlib::GzipWriter.new(tmp)
+ chunk.write_to(w)
+ w.close
+ elsif @store_as == "lzo"
+ w = Tempfile.new("chunk-tmp")
+ chunk.write_to(w)
+ w.close
+ tmp.close
+ # We don't check the return code because we can't recover lzop failure.
+ system "lzop #{@command_parameter} -o #{tmp.path} #{w.path}"
+ elsif @store_as == "lzma2"
+ w = Tempfile.new("chunk-xz-tmp")
+ chunk.write_to(w)
+ w.close
+ tmp.close
+ system "xz #{@command_parameter} -c #{w.path} > #{tmp.path}"
+ else
+ chunk.write_to(tmp)
+ tmp.close
+ end
+ @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @mime_type,
+ :reduced_redundancy => @reduced_redundancy})
+ ensure
+ tmp.close(true) rescue nil
+ w.close rescue nil
+ w.unlink rescue nil
+ end
end
- # copied from each mixin because current TimeSlicedOutput can't support mixins.
- if @include_tag_key
- record[@tag_key] = tag
- end
- if @include_time_key
- record[@time_key] = time_str
- end
+ private
- if @format_json
- Yajl.dump(record) + "\n"
- else
- "#{time_str}\t#{tag}\t#{Yajl.dump(record)}\n"
+ def ensure_bucket
+ if !@bucket.exists?
+ if @auto_create_bucket
+ log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
+ @s3.buckets.create(@s3_bucket)
+ else
+ raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
+ end
+ end
end
- end
- def write(chunk)
- i = 0
-
- begin
- path = @path_slicer.call(@path)
- values_for_s3_object_key = {
- "path" => path,
- "time_slice" => chunk.key,
- "file_extension" => @ext,
- "index" => i
- }
- s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
- values_for_s3_object_key[expr[2...expr.size-1]]
- }
- i += 1
- end while @bucket.objects[s3path].exists?
-
- tmp = Tempfile.new("s3-")
- begin
- if @store_as == "gzip"
- w = Zlib::GzipWriter.new(tmp)
- chunk.write_to(w)
- w.close
- elsif @store_as == "lzo"
- w = Tempfile.new("chunk-tmp")
- chunk.write_to(w)
- w.close
- tmp.close
- # We don't check the return code because we can't recover lzop failure.
- system "lzop #{@command_parameter} -o #{tmp.path} #{w.path}"
- elsif @store_as == "lzma2"
- w = Tempfile.new("chunk-xz-tmp")
- chunk.write_to(w)
- w.close
- tmp.close
- system "xz #{@command_parameter} -c #{w.path} > #{tmp.path}"
- else
- chunk.write_to(tmp)
- tmp.close
- end
- @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @mime_type,
- :reduced_redundancy => @reduced_redundancy})
- ensure
- tmp.close(true) rescue nil
- w.close rescue nil
- w.unlink rescue nil
+ def check_apikeys
+ @bucket.empty?
+ rescue
+ raise "aws_key_id or aws_sec_key is invalid. Please check your configuration"
end
- end
- private
-
- def ensure_bucket
- if !@bucket.exists?
- if @auto_create_bucket
- log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
- @s3.buckets.create(@s3_bucket)
- else
- raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
+ def check_command(command, algo)
+ begin
+ Open3.capture3("#{command} -V")
+ rescue Errno::ENOENT
+ raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression"
end
end
end
-
- def check_apikeys
- @bucket.empty?
- rescue
- raise "aws_key_id or aws_sec_key is invalid. Please check your configuration"
- end
-
- def check_command(command, algo)
- begin
- Open3.capture3("#{command} -V")
- rescue Errno::ENOENT
- raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression"
- end
- end
-end
-
-
end