lib/logstash/outputs/s3.rb in logstash-output-s3-3.0.1 vs lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.1
- old
+ new
@@ -60,11 +60,11 @@
# [source,ruby]
# output {
# s3{
# access_key_id => "crazy_key" (required)
# secret_access_key => "monkey_access_key" (required)
-# endpoint_region => "eu-west-1" (required) - Deprecated
+# region => "eu-west-1" (optional, default = "us-east-1")
# bucket => "boss_please_open_your_bucket" (required)
# size_file => 2048 (optional) - Bytes
# time_file => 5 (optional) - Minutes
# format => "plain" (optional)
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
@@ -80,15 +80,10 @@
default :codec, 'line'
# S3 bucket
config :bucket, :validate => :string
- # AWS endpoint_region
- config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2",
- "eu-west-1", "ap-southeast-1", "ap-southeast-2",
- "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :deprecated => 'Deprecated, use region instead.'
-
# Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file.
# If you have tags then it will generate a specific size file for every tags
##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket.
config :size_file, :validate => :number, :default => 0
@@ -133,10 +128,13 @@
# Will generate this file:
# "ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt"
#
config :tags, :validate => :array, :default => []
+ # Specify the content encoding. Supports ("gzip"). Defaults to "none"
+ config :encoding, :validate => ["none", "gzip"], :default => "none"
+
# Exposed attributes for testing purpose.
attr_accessor :tempfile
attr_reader :page_counter
attr_reader :s3
@@ -156,21 +154,12 @@
{}
end
end
def aws_service_endpoint(region)
- # Make the deprecated endpoint_region work
- # TODO: (ph) Remove this after deprecation.
-
- if @endpoint_region
- region_to_use = @endpoint_region
- else
- region_to_use = @region
- end
-
return {
- :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com"
+ :s3_endpoint => region == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region}.amazonaws.com"
}
end
public
def write_on_bucket(file)
@@ -183,11 +172,14 @@
File.open(file, 'r') do |fileIO|
begin
# prepare for write the file
object = bucket.objects[remote_filename]
- object.write(fileIO, :acl => @canned_acl, :server_side_encryption => @server_side_encryption ? :aes256 : nil)
+ object.write(fileIO,
+ :acl => @canned_acl,
+ :server_side_encryption => @server_side_encryption ? :aes256 : nil,
+ :content_encoding => @encoding == "gzip" ? "gzip" : nil)
rescue AWS::Errors::Base => error
@logger.error("S3: AWS error", :error => error)
raise LogStash::Error, "AWS Configuration Error, #{error}"
end
end
@@ -205,11 +197,15 @@
@file_rotation_lock.synchronize do
unless @tempfile.nil?
@tempfile.close
end
- @tempfile = File.open(filename, "a")
+ if @encoding == "gzip"
+ @tempfile = Zlib::GzipWriter.open(filename)
+ else
+ @tempfile = File.open(filename, "a")
+ end
end
end
public
def register
@@ -270,11 +266,11 @@
public
def restore_from_crashes
@logger.debug("S3: Checking for temp files from a previoius crash...")
- Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
+ Dir[File.join(@temporary_directory, "*.#{get_tempfile_extension}")].each do |file|
name_file = File.basename(file)
@logger.warn("S3: Found temporary file from crash. Uploading file to S3.", :filename => name_file)
move_file_to_bucket_async(file)
end
end
@@ -299,19 +295,24 @@
public
def periodic_interval
@time_file * 60
end
+ private
+ def get_tempfile_extension
+ @encoding == "gzip" ? "#{TEMPFILE_EXTENSION}.gz" : "#{TEMPFILE_EXTENSION}"
+ end
+
public
def get_temporary_filename(page_counter = 0)
current_time = Time.now
filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}"
if @tags.size > 0
- return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
+ return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{get_tempfile_extension}"
else
- return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
+ return "#{filename}.part#{page_counter}.#{get_tempfile_extension}"
end
end
public
def receive(event)
@@ -320,26 +321,37 @@
end
public
def rotate_events_log?
@file_rotation_lock.synchronize do
- @tempfile.size > @size_file
+ tempfile_size > @size_file
end
end
+ private
+ def tempfile_size
+ if @tempfile.instance_of? File
+ @tempfile.size
+ elsif @tempfile.instance_of? Zlib::GzipWriter
+ @tempfile.tell
+ else
+ raise LogStash::Error, "Unable to get size of temp file of type #{@tempfile.class}"
+ end
+ end
+
public
def write_events_to_multiple_files?
@size_file > 0
end
public
def write_to_tempfile(event)
begin
- @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
+ @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile.path))
@file_rotation_lock.synchronize do
- @tempfile.syswrite(event)
+ @tempfile.write(event)
end
rescue Errno::ENOSPC
@logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
close
end
@@ -363,17 +375,21 @@
private
def handle_event(encoded_event)
if write_events_to_multiple_files?
if rotate_events_log?
- @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
+ @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile.path))
- move_file_to_bucket_async(@tempfile.path)
+ tempfile_path = @tempfile.path
+ # close and start next file before sending the previous one
next_page
create_temporary_file
+
+ # send to s3
+ move_file_to_bucket_async(tempfile_path)
else
- @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
+ @logger.debug("S3: tempfile file size report.", :tempfile_size => tempfile_size, :size_file => @size_file)
end
end
write_to_tempfile(encoded_event)
end
@@ -384,12 +400,16 @@
LogStash::Util::set_thread_name("<S3 periodic uploader")
Stud.interval(periodic_interval, :sleep_then_run => true) do
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)
- move_file_to_bucket_async(@tempfile.path)
+ tempfile_path = @tempfile.path
+ # close and start next file before sending the previous one
next_page
create_temporary_file
+
+ # send to s3
+ move_file_to_bucket_async(tempfile_path)
end
end
end
private