lib/sqs_buffer/client.rb in sqs_buffer-0.3.0 vs lib/sqs_buffer/client.rb in sqs_buffer-0.3.1
- old
+ new
@@ -10,11 +10,10 @@
@poller = Aws::SQS::QueuePoller.new(@queue_url, client: client)
@skip_delete = opts.fetch(:skip_delete, true)
@max_number_of_messages = opts.fetch(:max_number_of_messages, 10).to_i
@logger = opts.fetch(:logger, Logger.new(STDOUT))
- @process_block = Concurrent::MutexAtomicReference.new
@before_request_block = Concurrent::MutexAtomicReference.new
@process_block = Concurrent::MutexAtomicReference.new
@message_queue = Concurrent::Array.new
@last_process_time = Concurrent::AtomicFixnum.new(Time.now.to_i)
@running = Concurrent::AtomicBoolean.new(false)
@@ -119,29 +118,35 @@
raise ":#{k} is a required key!"
end
def configure_before_request_block
@poller.before_request do |stats|
- if @running.false?
- @logger.info "Shutting down. Processing all messages first..."
- process_all_messages
- @logger.info "All messages have been processed. Throwing :stop_polling"
- throw :stop_polling
+ begin
+ if @running.false?
+ @logger.info "Shutting down. Processing all messages first..."
+ process_all_messages
+ @logger.info "All messages have been processed. Throwing :stop_polling"
+ throw :stop_polling
+ end
+ if @before_request_block.value
+ @before_request_block.value.call(stats)
+ end
+ if need_to_process?
+ process_all_messages
+ end
+ rescue => e
+ @logger.error "Exception: #{e.message} in before_request block. | Backtrace: #{e.backtrace}"
end
- if @before_request_block.value
- @before_request_block.value.call(stats)
- end
- if need_to_process?
- process_all_messages
- end
end # End Poller loop
end
def store_messages(messages)
messages.each do |msg|
store_message(msg)
end
+ rescue => e
+ @logger.error "Exception: #{e.message} while storing messages: #{messages} | Backtrace: #{e.backtrace}"
end
def store_message(msg)
@message_queue << msg
rescue StandardError => e
@@ -156,10 +161,10 @@
while @message_queue.length > 0 do
begin
messages = @message_queue.shift(10)
@poller.delete_messages(messages)
rescue StandardError => e
- @logger.error "An exception(#{e.message}) occurred while deleting these messages: #{messages.join("\n")} | Backtrace: #{e.backtrace}"
+ @logger.error "An exception(#{e.message}) occurred while deleting these messages: #{messages} | Backtrace: #{e.backtrace}"
end
end
end
end