lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.1 vs lib/logstash/outputs/s3.rb in logstash-output-s3-3.1.2

- old
+ new

@@ -133,11 +133,11 @@ # Specify the content encoding. Supports ("gzip"). Defaults to "none" config :encoding, :validate => ["none", "gzip"], :default => "none" # Exposed attributes for testing purpose. attr_accessor :tempfile - attr_reader :page_counter + attr_reader :page_counter, :upload_workers attr_reader :s3 def aws_s3_config @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region) @s3 = AWS::S3.new(full_options) @@ -368,11 +368,11 @@ end private def shutdown_upload_workers @logger.debug("S3: Gracefully shutdown the upload workers") - @upload_queue << LogStash::ShutdownEvent + @upload_queue << LogStash::SHUTDOWN end private def handle_event(encoded_event) if write_events_to_multiple_files? @@ -419,29 +419,41 @@ @upload_workers = @upload_workers_count.times.map do |worker_id| Stud::Task.new do LogStash::Util::set_thread_name("<S3 upload worker #{worker_id}") - while true do + continue = true + while continue do @logger.debug("S3: upload worker is waiting for a new file to upload.", :worker_id => worker_id) - upload_worker + continue = upload_worker end end end end private def upload_worker - file = @upload_queue.deq + file = nil + begin + file = @upload_queue.deq - case file - when LogStash::ShutdownEvent + if file == LogStash::SHUTDOWN @logger.debug("S3: upload worker is shutting down gracefuly") - @upload_queue.enq(LogStash::ShutdownEvent) + @upload_queue.enq(LogStash::SHUTDOWN) + false else @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file)) move_file_to_bucket(file) + true + end + rescue Exception => ex + @logger.error("failed to upload, will re-enqueue #{file} for upload", + :ex => ex, :backtrace => ex.backtrace) + unless file.nil? # Rare case if the first line of the begin doesn't execute + @upload_queue.enq(file) + end + true end end private def next_page