lib/sqs_buffer/client.rb in sqs_buffer-0.3.0 vs lib/sqs_buffer/client.rb in sqs_buffer-0.3.1

- old
+ new

@@ -10,11 +10,10 @@ @poller = Aws::SQS::QueuePoller.new(@queue_url, client: client) @skip_delete = opts.fetch(:skip_delete, true) @max_number_of_messages = opts.fetch(:max_number_of_messages, 10).to_i @logger = opts.fetch(:logger, Logger.new(STDOUT)) - @process_block = Concurrent::MutexAtomicReference.new @before_request_block = Concurrent::MutexAtomicReference.new @process_block = Concurrent::MutexAtomicReference.new @message_queue = Concurrent::Array.new @last_process_time = Concurrent::AtomicFixnum.new(Time.now.to_i) @running = Concurrent::AtomicBoolean.new(false) @@ -119,29 +118,35 @@ raise ":#{k} is a required key!" end def configure_before_request_block @poller.before_request do |stats| - if @running.false? - @logger.info "Shutting down. Processing all messages first..." - process_all_messages - @logger.info "All messages have been processed. Throwing :stop_polling" - throw :stop_polling + begin + if @running.false? + @logger.info "Shutting down. Processing all messages first..." + process_all_messages + @logger.info "All messages have been processed. Throwing :stop_polling" + throw :stop_polling + end + if @before_request_block.value + @before_request_block.value.call(stats) + end + if need_to_process? + process_all_messages + end + rescue => e + @logger.error "Exception: #{e.message} in before_request block. | Backtrace: #{e.backtrace}" end - if @before_request_block.value - @before_request_block.value.call(stats) - end - if need_to_process? - process_all_messages - end end # End Poller loop end def store_messages(messages) messages.each do |msg| store_message(msg) end + rescue => e + @logger.error "Exception: #{e.message} while storing messages: #{messages} | Backtrace: #{e.backtrace}" end def store_message(msg) @message_queue << msg rescue StandardError => e @@ -156,10 +161,10 @@ while @message_queue.length > 0 do begin messages = @message_queue.shift(10) @poller.delete_messages(messages) rescue StandardError => e - @logger.error "An exception(#{e.message}) occurred while deleting these messages: #{messages.join("\n")} | Backtrace: #{e.backtrace}" + @logger.error "An exception(#{e.message}) occurred while deleting these messages: #{messages} | Backtrace: #{e.backtrace}" end end end end