lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.3.0 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.3.1

- old
+ new

@@ -27,10 +27,11 @@ config_param :aws_key_id, :string, :default => nil config_param :aws_sec_key, :string, :default => nil config_param :s3_bucket, :string config_param :s3_endpoint, :string, :default => nil config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}" + config_param :store_as, :string, :default => "gzip" config_param :auto_create_bucket, :bool, :default => true attr_reader :bucket include Fluent::Mixin::ConfigPlaceholders @@ -57,10 +58,16 @@ raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output" end end end + @ext, @mime_type = case @store_as + when 'gzip' then ['gz', 'application/x-gzip'] + when 'json' then ['json', 'application/json'] + else ['txt', 'text/plain'] + end + @timef = TimeFormatter.new(@time_format, @localtime) end def start super @@ -99,28 +106,34 @@ end end def write(chunk) i = 0 + begin values_for_s3_object_key = { "path" => @path, "time_slice" => chunk.key, - "file_extension" => "gz", + "file_extension" => @ext, "index" => i } s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr| values_for_s3_object_key[expr[2...expr.size-1]] } i += 1 end while @bucket.objects[s3path].exists? tmp = Tempfile.new("s3-") - w = Zlib::GzipWriter.new(tmp) begin - chunk.write_to(w) - w.close - @bucket.objects[s3path].write(Pathname.new(tmp.path), :content_type => 'application/x-gzip') + if @store_as == "gzip" + w = Zlib::GzipWriter.new(tmp) + chunk.write_to(w) + w.close + else + chunk.write_to(tmp) + tmp.close + end + @bucket.objects[s3path].write(Pathname.new(tmp.path), :content_type => @mime_type) ensure tmp.close(true) rescue nil w.close rescue nil end end