lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.4.3 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.5.0

- old
+ new

@@ -2,35 +2,28 @@ require 'fluent/mixin/config_placeholders' class S3Output < Fluent::TimeSlicedOutput Fluent::Plugin.register_output('s3', self) - unless method_defined?(:log) - define_method(:log) { $log } - end - def initialize super require 'aws-sdk' require 'zlib' require 'time' require 'tempfile' - require 'open3' - @use_ssl = true + @compressor = nil end config_param :path, :string, :default => "" - + config_param :use_ssl, :bool, :default => true config_param :aws_key_id, :string, :default => nil config_param :aws_sec_key, :string, :default => nil config_param :s3_bucket, :string config_param :s3_region, :string, :default => nil - config_param :s3_endpoint, :string, :default => nil config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" config_param :store_as, :string, :default => "gzip" - config_param :command_parameter, :string, :default => nil config_param :auto_create_bucket, :bool, :default => true config_param :check_apikey_on_start, :bool, :default => true config_param :proxy_uri, :string, :default => nil config_param :reduced_redundancy, :bool, :default => false config_param :format, :string, :default => 'out_file' @@ -44,44 +37,24 @@ end def configure(conf) super - if use_ssl = conf['use_ssl'] - if use_ssl.empty? - @use_ssl = true - else - @use_ssl = Config.bool_value(use_ssl) - if @use_ssl.nil? - raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output" - end - end + if conf.has_key?('s3_endpoint') + raise ConfigError, "s3_endpoint parameter is removed. Use s3_region instead" end - @ext, @mime_type = case @store_as - when 'gzip' - ['gz', 'application/x-gzip'] - when 'lzo' - check_command('lzop', 'LZO') - @command_parameter = '-qf1' if @command_parameter.nil? - ['lzo', 'application/x-lzop'] - when 'lzma2' - check_command('xz', 'LZMA2') - @command_parameter = '-qf0' if @command_parameter.nil? - ['xz', 'application/x-xz'] - when 'json' - ['json', 'application/json'] - else - ['txt', 'text/plain'] - end - - if format_json = conf['format_json'] - $log.warn "format_json is deprecated. Use 'format json' instead" - conf['format'] = 'json' - else - conf['format'] = @format + begin + @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new + rescue => e + $log.warn "#{@store_as} not found. Use 'text' instead" + @compressor = TextCompressor.new end + @compressor.configure(conf) + + # TODO: use Plugin.new_formatter instead of TextFormatter.create + conf['format'] = @format @formatter = TextFormatter.create(conf) if @localtime @path_slicer = Proc.new {|path| Time.now.strftime(path) @@ -99,11 +72,10 @@ if @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key end options[:region] = @s3_region if @s3_region - options[:endpoint] = @s3_endpoint if @s3_endpoint options[:proxy_uri] = @proxy_uri if @proxy_uri options[:use_ssl] = @use_ssl @s3 = AWS::S3.new(options) @bucket = @s3.buckets[@s3_bucket] @@ -123,11 +95,11 @@ begin path = @path_slicer.call(@path) values_for_s3_object_key = { "path" => path, "time_slice" => chunk.key, - "file_extension" => @ext, + "file_extension" => @compressor.ext, "index" => i } s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| values_for_s3_object_key[expr[2...expr.size-1]] } @@ -139,37 +111,15 @@ previous_path = s3path end while @bucket.objects[s3path].exists? tmp = Tempfile.new("s3-") begin - if @store_as == "gzip" - w = Zlib::GzipWriter.new(tmp) - chunk.write_to(w) - w.close - elsif @store_as == "lzo" - w = Tempfile.new("chunk-tmp") - chunk.write_to(w) - w.close - tmp.close - # We don't check the return code because we can't recover lzop failure. - system "lzop #{@command_parameter} -o #{tmp.path} #{w.path}" - elsif @store_as == "lzma2" - w = Tempfile.new("chunk-xz-tmp") - chunk.write_to(w) - w.close - tmp.close - system "xz #{@command_parameter} -c #{w.path} > #{tmp.path}" - else - chunk.write_to(tmp) - tmp.close - end - @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @mime_type, + @compressor.compress(chunk, tmp) + @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @compressor.content_type, :reduced_redundancy => @reduced_redundancy}) ensure tmp.close(true) rescue nil - w.close rescue nil - w.unlink rescue nil end end private @@ -188,14 +138,93 @@ @bucket.empty? rescue raise "can't call S3 API. Please check your aws_key_id / aws_sec_key or s3_region configuration" end - def check_command(command, algo) - begin - Open3.capture3("#{command} -V") - rescue Errno::ENOENT - raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression" + class Compressor + include Configurable + + def configure(conf) + super end + + def ext + end + + def content_type + end + + def compress(chunk, tmp) + end + + private + + def check_command(command, algo = nil) + require 'open3' + + algo = command if algo.nil? + begin + Open3.capture3("#{command} -V") + rescue Errno::ENOENT + raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression" + end + end + end + + class GzipCompressor < Compressor + def ext + 'gz'.freeze + end + + def content_type + 'application/x-gzip'.freeze + end + + def compress(chunk, tmp) + w = Zlib::GzipWriter.new(tmp) + chunk.write_to(w) + w.close + ensure + w.close rescue nil + w.unlink rescue nil + end + end + + class TextCompressor < Compressor + def ext + 'txt'.freeze + end + + def content_type + 'text/plain'.freeze + end + + def compress(chunk, tmp) + chunk.write_to(tmp) + tmp.close + end + end + + class JsonCompressor < TextCompressor + def ext + 'json'.freeze + end + + def content_type + 'application/json'.freeze + end + end + + COMPRESSOR_REGISTRY = Registry.new(:s3_compressor_type, 'fluent/plugin/s3_compressor_') + { + 'gzip' => GzipCompressor, + 'json' => JsonCompressor, + 'text' => TextCompressor + }.each { |name, compressor| + COMPRESSOR_REGISTRY.register(name, compressor) + } + + def self.register_compressor(name, compressor) + COMPRESSOR_REGISTRY.register(name, compressor) end end end