lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.2 vs lib/logstash/outputs/s3.rb in logstash-output-s3-3.2.0
- old
+ new
@@ -11,12 +11,12 @@
# INFORMATION:
#
# This plugin batches and uploads logstash events into Amazon Simple Storage Service (Amazon S3).
-#
-# Requirements:
+#
+# Requirements:
# * Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key)
# * S3 PutObject permission
# * Run logstash as superuser to establish connection
#
# S3 outputs create temporary files into "/opt/logstash/S3_temp/". If you want, you can change the path at the start of register method.
@@ -40,11 +40,11 @@
##[Note regarding time_file and size_file] :
#
# Both time_file and size_file settings can trigger a log "file rotation"
# A log rotation pushes the current log "part" to s3 and deleted from local temporary storage.
#
-## If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
+## If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.
##
## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created..
## When time_file minutes elapses, a log rotation will be triggered.
#
@@ -65,11 +65,11 @@
# region => "eu-west-1" (optional, default = "us-east-1")
# bucket => "boss_please_open_your_bucket" (required)
# size_file => 2048 (optional) - Bytes
# time_file => 5 (optional) - Minutes
# format => "plain" (optional)
-# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
+# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control". Defaults to "private" )
# }
#
class LogStash::Outputs::S3 < LogStash::Outputs::Base
include LogStash::PluginMixins::AwsConfig
@@ -77,10 +77,12 @@
S3_INVALID_CHARACTERS = /[\^`><]/
config_name "s3"
default :codec, 'line'
+ concurrency :single
+
# S3 bucket
config :bucket, :validate => :string
# Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file.
# If you have tags then it will generate a specific size file for every tags
@@ -98,11 +100,11 @@
## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket,
## for example if you have single Instance.
config :restore, :validate => :boolean, :default => false
# The S3 canned ACL to use when putting the file. Defaults to "private".
- config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"],
+ config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control"],
:default => "private"
# Specifies wether or not to use S3's AES256 server side encryption. Defaults to false.
config :server_side_encryption, :validate => :boolean, :default => false
@@ -212,12 +214,10 @@
require "aws-sdk"
# required if using ruby version < 2.0
# http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
AWS.eager_autoload!(AWS::S3)
- workers_not_supported
-
@s3 = aws_s3_config
@upload_queue = Queue.new
@file_rotation_lock = Mutex.new
if @prefix && @prefix =~ S3_INVALID_CHARACTERS
@@ -256,11 +256,19 @@
file.write('test')
end
begin
write_on_bucket(test_filename)
- delete_on_bucket(test_filename)
+
+ begin
+ remote_filename = "#{@prefix}#{File.basename(test_filename)}"
+ bucket = @s3.buckets[@bucket]
+ bucket.objects[remote_filename].delete
+ rescue StandardError => e
+ # we actually only need `put_object`, but if we dont delete them
+ # we can have a lot of tests files
+ end
ensure
File.delete(test_filename)
end
end
@@ -462,27 +470,9 @@
end
private
def reset_page_counter
@page_counter = 0
- end
-
- private
- def delete_on_bucket(filename)
- bucket = @s3.buckets[@bucket]
-
- remote_filename = "#{@prefix}#{File.basename(filename)}"
-
- @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket)
-
- begin
- # prepare for write the file
- object = bucket.objects[remote_filename]
- object.delete
- rescue AWS::Errors::Base => e
- @logger.error("S3: AWS error", :error => e)
- raise LogStash::ConfigurationError, "AWS Configuration Error"
- end
end
private
def move_file_to_bucket_async(file)
@logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file))