lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.1 vs lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.2
- old
+ new
@@ -133,11 +133,11 @@
# Specify the content encoding. Supports ("gzip"). Defaults to "none"
config :encoding, :validate => ["none", "gzip"], :default => "none"
# Exposed attributes for testing purpose.
attr_accessor :tempfile
- attr_reader :page_counter
+ attr_reader :page_counter, :upload_workers
attr_reader :s3
def aws_s3_config
@logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region)
@s3 = AWS::S3.new(full_options)
@@ -368,11 +368,11 @@
end
private
def shutdown_upload_workers
@logger.debug("S3: Gracefully shutdown the upload workers")
- @upload_queue << LogStash::ShutdownEvent
+ @upload_queue << LogStash::SHUTDOWN
end
private
def handle_event(encoded_event)
if write_events_to_multiple_files?
@@ -419,29 +419,41 @@
@upload_workers = @upload_workers_count.times.map do |worker_id|
Stud::Task.new do
LogStash::Util::set_thread_name("<S3 upload worker #{worker_id}")
- while true do
+ continue = true
+ while continue do
@logger.debug("S3: upload worker is waiting for a new file to upload.", :worker_id => worker_id)
- upload_worker
+ continue = upload_worker
end
end
end
end
private
def upload_worker
- file = @upload_queue.deq
+ file = nil
+ begin
+ file = @upload_queue.deq
- case file
- when LogStash::ShutdownEvent
+ if file == LogStash::SHUTDOWN
@logger.debug("S3: upload worker is shutting down gracefuly")
- @upload_queue.enq(LogStash::ShutdownEvent)
+ @upload_queue.enq(LogStash::SHUTDOWN)
+ false
else
@logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file))
move_file_to_bucket(file)
+ true
+ end
+ rescue Exception => ex
+ @logger.error("failed to upload, will re-enqueue #{file} for upload",
+ :ex => ex, :backtrace => ex.backtrace)
+ unless file.nil? # Rare case if the first line of the begin doesn't execute
+ @upload_queue.enq(file)
+ end
+ true
end
end
private
def next_page