lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.3.7 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.4.0

- old
+ new

@@ -1,219 +1,193 @@ module Fluent + require 'fluent/mixin/config_placeholders' -require 'fluent/mixin/config_placeholders' + class S3Output < Fluent::TimeSlicedOutput + Fluent::Plugin.register_output('s3', self) -class S3Output < Fluent::TimeSlicedOutput - Fluent::Plugin.register_output('s3', self) + unless method_defined?(:log) + define_method(:log) { $log } + end - unless method_defined?(:log) - define_method(:log) { $log } - end + def initialize + super + require 'aws-sdk' + require 'zlib' + require 'time' + require 'tempfile' + require 'open3' - def initialize - super - require 'aws-sdk' - require 'zlib' - require 'time' - require 'tempfile' - require 'open3' + @use_ssl = true + end - @use_ssl = true - end + config_param :path, :string, :default => "" - config_param :path, :string, :default => "" - config_param :time_format, :string, :default => nil + config_param :aws_key_id, :string, :default => nil + config_param :aws_sec_key, :string, :default => nil + config_param :s3_bucket, :string + config_param :s3_endpoint, :string, :default => nil + config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" + config_param :store_as, :string, :default => "gzip" + config_param :command_parameter, :string, :default => nil + config_param :auto_create_bucket, :bool, :default => true + config_param :check_apikey_on_start, :bool, :default => true + config_param :proxy_uri, :string, :default => nil + config_param :reduced_redundancy, :bool, :default => false + config_param :format, :string, :default => 'out_file' - include SetTagKeyMixin - config_set_default :include_tag_key, false + attr_reader :bucket - include SetTimeKeyMixin - config_set_default :include_time_key, false + include Fluent::Mixin::ConfigPlaceholders - config_param :aws_key_id, :string, :default => nil - config_param :aws_sec_key, :string, :default => nil - config_param :s3_bucket, :string - config_param :s3_endpoint, :string, :default => nil - config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" - config_param :store_as, :string, :default => "gzip" - config_param :command_parameter, :string, :default => nil - config_param :auto_create_bucket, :bool, :default => true - config_param :check_apikey_on_start, :bool, :default => true - config_param :proxy_uri, :string, :default => nil - config_param :reduced_redundancy, :bool, :default => false + def placeholders + [:percent] + end - attr_reader :bucket + def configure(conf) + super - include Fluent::Mixin::ConfigPlaceholders + if use_ssl = conf['use_ssl'] + if use_ssl.empty? + @use_ssl = true + else + @use_ssl = Config.bool_value(use_ssl) + if @use_ssl.nil? + raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output" + end + end + end - def placeholders - [:percent] - end + @ext, @mime_type = case @store_as + when 'gzip' + ['gz', 'application/x-gzip'] + when 'lzo' + check_command('lzop', 'LZO') + @command_parameter = '-qf1' if @command_parameter.nil? + ['lzo', 'application/x-lzop'] + when 'lzma2' + check_command('xz', 'LZMA2') + @command_parameter = '-qf0' if @command_parameter.nil? + ['xz', 'application/x-xz'] + when 'json' + ['json', 'application/json'] + else + ['txt', 'text/plain'] + end - def configure(conf) - super + if format_json = conf['format_json'] + $log.warn "format_json is deprecated. Use 'format json' instead" + conf['format'] = 'json' + else + conf['format'] = @format + end + @formatter = TextFormatter.create(conf) - if format_json = conf['format_json'] - @format_json = true - else - @format_json = false - end - - if use_ssl = conf['use_ssl'] - if use_ssl.empty? - @use_ssl = true + if @localtime + @path_slicer = Proc.new {|path| + Time.now.strftime(path) + } else - @use_ssl = Config.bool_value(use_ssl) - if @use_ssl.nil? - raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output" - end + @path_slicer = Proc.new {|path| + Time.now.utc.strftime(path) + } end end - @ext, @mime_type = case @store_as - when 'gzip' - ['gz', 'application/x-gzip'] - when 'lzo' - check_command('lzop', 'LZO') - @command_parameter = '-qf1' if @command_parameter.nil? - ['lzo', 'application/x-lzop'] - when 'lzma2' - check_command('xz', 'LZMA2') - @command_parameter = '-qf0' if @command_parameter.nil? - ['xz', 'application/x-xz'] - when 'json' - ['json', 'application/json'] - else - ['txt', 'text/plain'] - end + def start + super + options = {} + if @aws_key_id && @aws_sec_key + options[:access_key_id] = @aws_key_id + options[:secret_access_key] = @aws_sec_key + end + options[:s3_endpoint] = @s3_endpoint if @s3_endpoint + options[:proxy_uri] = @proxy_uri if @proxy_uri + options[:use_ssl] = @use_ssl - @timef = TimeFormatter.new(@time_format, @localtime) + @s3 = AWS::S3.new(options) + @bucket = @s3.buckets[@s3_bucket] - if @localtime - @path_slicer = Proc.new {|path| - Time.now.strftime(path) - } - else - @path_slicer = Proc.new {|path| - Time.now.utc.strftime(path) - } + check_apikeys if @check_apikey_on_start + ensure_bucket end - end - - def start - super - options = {} - if @aws_key_id && @aws_sec_key - options[:access_key_id] = @aws_key_id - options[:secret_access_key] = @aws_sec_key + def format(tag, time, record) + @formatter.format(tag, time, record) end - options[:s3_endpoint] = @s3_endpoint if @s3_endpoint - options[:proxy_uri] = @proxy_uri if @proxy_uri - options[:use_ssl] = @use_ssl - @s3 = AWS::S3.new(options) - @bucket = @s3.buckets[@s3_bucket] + def write(chunk) + i = 0 - check_apikeys if @check_apikey_on_start - ensure_bucket - end + begin + path = @path_slicer.call(@path) + values_for_s3_object_key = { + "path" => path, + "time_slice" => chunk.key, + "file_extension" => @ext, + "index" => i + } + s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| + values_for_s3_object_key[expr[2...expr.size-1]] + } + i += 1 + end while @bucket.objects[s3path].exists? - def format(tag, time, record) - if @include_time_key || !@format_json - time_str = @timef.format(time) + tmp = Tempfile.new("s3-") + begin + if @store_as == "gzip" + w = Zlib::GzipWriter.new(tmp) + chunk.write_to(w) + w.close + elsif @store_as == "lzo" + w = Tempfile.new("chunk-tmp") + chunk.write_to(w) + w.close + tmp.close + # We don't check the return code because we can't recover lzop failure. + system "lzop #{@command_parameter} -o #{tmp.path} #{w.path}" + elsif @store_as == "lzma2" + w = Tempfile.new("chunk-xz-tmp") + chunk.write_to(w) + w.close + tmp.close + system "xz #{@command_parameter} -c #{w.path} > #{tmp.path}" + else + chunk.write_to(tmp) + tmp.close + end + @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @mime_type, + :reduced_redundancy => @reduced_redundancy}) + ensure + tmp.close(true) rescue nil + w.close rescue nil + w.unlink rescue nil + end end - # copied from each mixin because current TimeSlicedOutput can't support mixins. - if @include_tag_key - record[@tag_key] = tag - end - if @include_time_key - record[@time_key] = time_str - end + private - if @format_json - Yajl.dump(record) + "\n" - else - "#{time_str}\t#{tag}\t#{Yajl.dump(record)}\n" + def ensure_bucket + if !@bucket.exists? + if @auto_create_bucket + log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" + @s3.buckets.create(@s3_bucket) + else + raise "The specified bucket does not exist: bucket = #{@s3_bucket}" + end + end end - end - def write(chunk) - i = 0 - - begin - path = @path_slicer.call(@path) - values_for_s3_object_key = { - "path" => path, - "time_slice" => chunk.key, - "file_extension" => @ext, - "index" => i - } - s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| - values_for_s3_object_key[expr[2...expr.size-1]] - } - i += 1 - end while @bucket.objects[s3path].exists? - - tmp = Tempfile.new("s3-") - begin - if @store_as == "gzip" - w = Zlib::GzipWriter.new(tmp) - chunk.write_to(w) - w.close - elsif @store_as == "lzo" - w = Tempfile.new("chunk-tmp") - chunk.write_to(w) - w.close - tmp.close - # We don't check the return code because we can't recover lzop failure. - system "lzop #{@command_parameter} -o #{tmp.path} #{w.path}" - elsif @store_as == "lzma2" - w = Tempfile.new("chunk-xz-tmp") - chunk.write_to(w) - w.close - tmp.close - system "xz #{@command_parameter} -c #{w.path} > #{tmp.path}" - else - chunk.write_to(tmp) - tmp.close - end - @bucket.objects[s3path].write(Pathname.new(tmp.path), {:content_type => @mime_type, - :reduced_redundancy => @reduced_redundancy}) - ensure - tmp.close(true) rescue nil - w.close rescue nil - w.unlink rescue nil + def check_apikeys + @bucket.empty? + rescue + raise "aws_key_id or aws_sec_key is invalid. Please check your configuration" end - end - private - - def ensure_bucket - if !@bucket.exists? - if @auto_create_bucket - log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" - @s3.buckets.create(@s3_bucket) - else - raise "The specified bucket does not exist: bucket = #{@s3_bucket}" + def check_command(command, algo) + begin + Open3.capture3("#{command} -V") + rescue Errno::ENOENT + raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression" end end end - - def check_apikeys - @bucket.empty? - rescue - raise "aws_key_id or aws_sec_key is invalid. Please check your configuration" - end - - def check_command(command, algo) - begin - Open3.capture3("#{command} -V") - rescue Errno::ENOENT - raise ConfigError, "'#{command}' utility must be in PATH for #{algo} compression" - end - end -end - - end