lib/logstash/outputs/s3.rb in logstash-output-s3-3.0.1 vs lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.1

- old
+ new

@@ -60,11 +60,11 @@ # [source,ruby] # output { # s3{ # access_key_id => "crazy_key" (required) # secret_access_key => "monkey_access_key" (required) -# endpoint_region => "eu-west-1" (required) - Deprecated +# region => "eu-west-1" (optional, default = "us-east-1") # bucket => "boss_please_open_your_bucket" (required) # size_file => 2048 (optional) - Bytes # time_file => 5 (optional) - Minutes # format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) @@ -80,15 +80,10 @@ default :codec, 'line' # S3 bucket config :bucket, :validate => :string - # AWS endpoint_region - config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2", - "eu-west-1", "ap-southeast-1", "ap-southeast-2", - "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :deprecated => 'Deprecated, use region instead.' - # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. # If you have tags then it will generate a specific size file for every tags ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. config :size_file, :validate => :number, :default => 0 @@ -133,10 +128,13 @@ # Will generate this file: # "ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt" # config :tags, :validate => :array, :default => [] + # Specify the content encoding. Supports ("gzip"). Defaults to "none" + config :encoding, :validate => ["none", "gzip"], :default => "none" + # Exposed attributes for testing purpose. attr_accessor :tempfile attr_reader :page_counter attr_reader :s3 @@ -156,21 +154,12 @@ {} end end def aws_service_endpoint(region) - # Make the deprecated endpoint_region work - # TODO: (ph) Remove this after deprecation. - - if @endpoint_region - region_to_use = @endpoint_region - else - region_to_use = @region - end - return { - :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com" + :s3_endpoint => region == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region}.amazonaws.com" } end public def write_on_bucket(file) @@ -183,11 +172,14 @@ File.open(file, 'r') do |fileIO| begin # prepare for write the file object = bucket.objects[remote_filename] - object.write(fileIO, :acl => @canned_acl, :server_side_encryption => @server_side_encryption ? :aes256 : nil) + object.write(fileIO, + :acl => @canned_acl, + :server_side_encryption => @server_side_encryption ? :aes256 : nil, + :content_encoding => @encoding == "gzip" ? "gzip" : nil) rescue AWS::Errors::Base => error @logger.error("S3: AWS error", :error => error) raise LogStash::Error, "AWS Configuration Error, #{error}" end end @@ -205,11 +197,15 @@ @file_rotation_lock.synchronize do unless @tempfile.nil? @tempfile.close end - @tempfile = File.open(filename, "a") + if @encoding == "gzip" + @tempfile = Zlib::GzipWriter.open(filename) + else + @tempfile = File.open(filename, "a") + end end end public def register @@ -270,11 +266,11 @@ public def restore_from_crashes @logger.debug("S3: Checking for temp files from a previoius crash...") - Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file| + Dir[File.join(@temporary_directory, "*.#{get_tempfile_extension}")].each do |file| name_file = File.basename(file) @logger.warn("S3: Found temporary file from crash. Uploading file to S3.", :filename => name_file) move_file_to_bucket_async(file) end end @@ -299,19 +295,24 @@ public def periodic_interval @time_file * 60 end + private + def get_tempfile_extension + @encoding == "gzip" ? "#{TEMPFILE_EXTENSION}.gz" : "#{TEMPFILE_EXTENSION}" + end + public def get_temporary_filename(page_counter = 0) current_time = Time.now filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}" if @tags.size > 0 - return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}" + return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{get_tempfile_extension}" else - return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}" + return "#{filename}.part#{page_counter}.#{get_tempfile_extension}" end end public def receive(event) @@ -320,26 +321,37 @@ end public def rotate_events_log? @file_rotation_lock.synchronize do - @tempfile.size > @size_file + tempfile_size > @size_file end end + private + def tempfile_size + if @tempfile.instance_of? File + @tempfile.size + elsif @tempfile.instance_of? Zlib::GzipWriter + @tempfile.tell + else + raise LogStash::Error, "Unable to get size of temp file of type #{@tempfile.class}" + end + end + public def write_events_to_multiple_files? @size_file > 0 end public def write_to_tempfile(event) begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile.path)) @file_rotation_lock.synchronize do - @tempfile.syswrite(event) + @tempfile.write(event) end rescue Errno::ENOSPC @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) close end @@ -363,17 +375,21 @@ private def handle_event(encoded_event) if write_events_to_multiple_files? if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile.path)) - move_file_to_bucket_async(@tempfile.path) + tempfile_path = @tempfile.path + # close and start next file before sending the previous one next_page create_temporary_file + + # send to s3 + move_file_to_bucket_async(tempfile_path) else - @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + @logger.debug("S3: tempfile file size report.", :tempfile_size => tempfile_size, :size_file => @size_file) end end write_to_tempfile(encoded_event) end @@ -384,12 +400,16 @@ LogStash::Util::set_thread_name("<S3 periodic uploader") Stud.interval(periodic_interval, :sleep_then_run => true) do @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - move_file_to_bucket_async(@tempfile.path) + tempfile_path = @tempfile.path + # close and start next file before sending the previous one next_page create_temporary_file + + # send to s3 + move_file_to_bucket_async(tempfile_path) end end end private