lib/logstash/outputs/s3/uploader.rb in logstash-output-s3-4.2.0 vs lib/logstash/outputs/s3/uploader.rb in logstash-output-s3-4.3.0
- old
+ new
@@ -4,25 +4,26 @@
module LogStash
module Outputs
class S3
class Uploader
- TIME_BEFORE_RETRYING_SECONDS = 1
+
DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new({
:min_threads => 1,
:max_threads => 8,
:max_queue => 1,
:fallback_policy => :caller_runs
})
-
attr_reader :bucket, :upload_options, :logger
- def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL)
+ def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL, retry_count: Float::INFINITY, retry_delay: 1)
@bucket = bucket
@workers_pool = threadpool
@logger = logger
+ @retry_count = retry_count
+ @retry_delay = retry_delay
end
def upload_async(file, options = {})
@workers_pool.post do
LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}")
@@ -31,10 +32,11 @@
end
def upload(file, options = {})
upload_options = options.fetch(:upload_options, {})
+ tries = 0
begin
obj = bucket.object(file.key)
obj.upload_file(file.path, upload_options)
rescue Errno::ENOENT => e
logger.error("File doesn't exist! Unrecoverable error.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
@@ -42,18 +44,25 @@
# When we get here it usually mean that S3 tried to do some retry by himself (default is 3)
# When the retry limit is reached or another error happen we will wait and retry.
#
# Thread might be stuck here, but I think its better than losing anything
# its either a transient errors or something bad really happened.
- logger.error("Uploading failed, retrying.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
- sleep TIME_BEFORE_RETRYING_SECONDS
- retry
+ if tries < @retry_count
+ tries += 1
+ logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
+ sleep @retry_delay
+ retry
+ else
+ logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
+ end
end
- options[:on_complete].call(file) unless options[:on_complete].nil?
- rescue => e
- logger.error("An error occured in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
- raise e # reraise it since we don't deal with it now
+ begin
+ options[:on_complete].call(file) unless options[:on_complete].nil?
+ rescue => e
+ logger.error("An error occurred in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
+ raise e # reraise it since we don't deal with it now
+ end
end
def stop
@workers_pool.shutdown
@workers_pool.wait_for_termination(nil) # block until its done