lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.3.0 vs lib/fluent/plugin/out_s3.rb in fluent-plugin-s3-0.3.1
- old
+ new
@@ -27,10 +27,11 @@
config_param :aws_key_id, :string, :default => nil
config_param :aws_sec_key, :string, :default => nil
config_param :s3_bucket, :string
config_param :s3_endpoint, :string, :default => nil
config_param :s3_object_key_format, :string, :default => "%{path}%{time_slice}_%{index}.%{file_extension}"
+ config_param :store_as, :string, :default => "gzip"
config_param :auto_create_bucket, :bool, :default => true
attr_reader :bucket
include Fluent::Mixin::ConfigPlaceholders
@@ -57,10 +58,16 @@
raise ConfigError, "'true' or 'false' is required for use_ssl option on s3 output"
end
end
end
+ @ext, @mime_type = case @store_as
+ when 'gzip' then ['gz', 'application/x-gzip']
+ when 'json' then ['json', 'application/json']
+ else ['txt', 'text/plain']
+ end
+
@timef = TimeFormatter.new(@time_format, @localtime)
end
def start
super
@@ -99,28 +106,34 @@
end
end
def write(chunk)
i = 0
+
begin
values_for_s3_object_key = {
"path" => @path,
"time_slice" => chunk.key,
- "file_extension" => "gz",
+ "file_extension" => @ext,
"index" => i
}
s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) { |expr|
values_for_s3_object_key[expr[2...expr.size-1]]
}
i += 1
end while @bucket.objects[s3path].exists?
tmp = Tempfile.new("s3-")
- w = Zlib::GzipWriter.new(tmp)
begin
- chunk.write_to(w)
- w.close
- @bucket.objects[s3path].write(Pathname.new(tmp.path), :content_type => 'application/x-gzip')
+ if @store_as == "gzip"
+ w = Zlib::GzipWriter.new(tmp)
+ chunk.write_to(w)
+ w.close
+ else
+ chunk.write_to(tmp)
+ tmp.close
+ end
+ @bucket.objects[s3path].write(Pathname.new(tmp.path), :content_type => @mime_type)
ensure
tmp.close(true) rescue nil
w.close rescue nil
end
end