lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.2 vs lib/logstash/outputs/s3.rb in logstash-output-s3-3.2.0

- old
+ new

@@ -11,12 +11,12 @@ # INFORMATION: # # This plugin batches and uploads logstash events into Amazon Simple Storage Service (Amazon S3). -# -# Requirements: +# +# Requirements: # * Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key) # * S3 PutObject permission # * Run logstash as superuser to establish connection # # S3 outputs create temporary files into "/opt/logstash/S3_temp/". If you want, you can change the path at the start of register method. @@ -40,11 +40,11 @@ ##[Note regarding time_file and size_file] : # # Both time_file and size_file settings can trigger a log "file rotation" # A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. # -## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). +## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). ## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered. ## ## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.. ## When time_file minutes elapses, a log rotation will be triggered. # @@ -65,11 +65,11 @@ # region => "eu-west-1" (optional, default = "us-east-1") # bucket => "boss_please_open_your_bucket" (required) # size_file => 2048 (optional) - Bytes # time_file => 5 (optional) - Minutes # format => "plain" (optional) -# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) +# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control". Defaults to "private" ) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base include LogStash::PluginMixins::AwsConfig @@ -77,10 +77,12 @@ S3_INVALID_CHARACTERS = /[\^`><]/ config_name "s3" default :codec, 'line' + concurrency :single + # S3 bucket config :bucket, :validate => :string # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. # If you have tags then it will generate a specific size file for every tags @@ -98,11 +100,11 @@ ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, ## for example if you have single Instance. config :restore, :validate => :boolean, :default => false # The S3 canned ACL to use when putting the file. Defaults to "private". - config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], + config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control"], :default => "private" # Specifies wether or not to use S3's AES256 server side encryption. Defaults to false. config :server_side_encryption, :validate => :boolean, :default => false @@ -212,12 +214,10 @@ require "aws-sdk" # required if using ruby version < 2.0 # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby AWS.eager_autoload!(AWS::S3) - workers_not_supported - @s3 = aws_s3_config @upload_queue = Queue.new @file_rotation_lock = Mutex.new if @prefix && @prefix =~ S3_INVALID_CHARACTERS @@ -256,11 +256,19 @@ file.write('test') end begin write_on_bucket(test_filename) - delete_on_bucket(test_filename) + + begin + remote_filename = "#{@prefix}#{File.basename(test_filename)}" + bucket = @s3.buckets[@bucket] + bucket.objects[remote_filename].delete + rescue StandardError => e + # we actually only need `put_object`, but if we dont delete them + # we can have a lot of tests files + end ensure File.delete(test_filename) end end @@ -462,27 +470,9 @@ end private def reset_page_counter @page_counter = 0 - end - - private - def delete_on_bucket(filename) - bucket = @s3.buckets[@bucket] - - remote_filename = "#{@prefix}#{File.basename(filename)}" - - @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket) - - begin - # prepare for write the file - object = bucket.objects[remote_filename] - object.delete - rescue AWS::Errors::Base => e - @logger.error("S3: AWS error", :error => e) - raise LogStash::ConfigurationError, "AWS Configuration Error" - end end private def move_file_to_bucket_async(file) @logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file))