lib/sqewer/connection.rb in sqewer-5.0.8 vs lib/sqewer/connection.rb in sqewer-5.0.9

- old
+ new

@@ -7,10 +7,11 @@ # * any execution that ends with an exception should cause the message to be re-enqueued class Sqewer::Connection DEFAULT_TIMEOUT_SECONDS = 5 BATCH_RECEIVE_SIZE = 10 MAX_RANDOM_FAILURES_PER_CALL = 10 + MAX_RANDOM_RECEIVE_FAILURES = 100 # sure to hit the max_elapsed_time of 900 seconds NotOurFaultAwsError = Class.new(StandardError) # A wrapper for most important properties of the received message class Message < Struct.new(:receipt_handle, :body) @@ -37,18 +38,21 @@ def initialize(queue_url) require 'aws-sdk' @queue_url = queue_url end - # Receive at most 10 messages from the queue, and return the array of Message objects. + # Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at + # most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available + # even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to + # continue. # # @return [Array<Message>] an array of Message objects def receive_messages - response = client.receive_message(queue_url: @queue_url, - wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10) - response.messages.map do | message | - Message.new(message.receipt_handle, message.body) + Retriable.retriable on: Seahorse::Client::NetworkingError, tries: MAX_RANDOM_RECEIVE_FAILURES do + response = client.receive_message(queue_url: @queue_url, + wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: BATCH_RECEIVE_SIZE) + response.messages.map {|message| Message.new(message.receipt_handle, message.body) } end end # Send a message to the backing queue # @@ -123,11 +127,11 @@ end private def handle_batch_with_retries(method, batch) - Retriable.retriable on: NotOurFaultAwsError, tries: MAX_RANDOM_FAILURES_PER_CALL do + Retriable.retriable on: [NotOurFaultAwsError, Seahorse::Client::NetworkingError], tries: MAX_RANDOM_FAILURES_PER_CALL do resp = client.send(method, queue_url: @queue_url, entries: batch) wrong_messages, aws_failures = resp.failed.partition {|m| m.sender_fault } if wrong_messages.any? err = wrong_messages.inspect + ', ' + resp.inspect raise "#{wrong_messages.length} messages failed while doing #{method.to_s} with error: #{err}" @@ -137,29 +141,9 @@ raise NotOurFaultAwsError end end end - class RetryWrapper < Struct.new(:sqs_client) - MAX_RETRIES = 1000 - # Provide retrying wrappers for all the methods of Aws::SQS::Client that we actually use - [:delete_message_batch, :send_message_batch, :receive_message].each do |retriable_method_name| - define_method(retriable_method_name) do |*args, **kwargs| - tries = 1 - begin - sqs_client.public_send(retriable_method_name, *args, **kwargs) - rescue Seahorse::Client::NetworkingError => e - if (tries += 1) >= MAX_RETRIES - raise(e) - else - sleep 0.5 - retry - end - end - end - end - end - def client - @client ||= RetryWrapper.new(Aws::SQS::Client.new) + @client ||= Aws::SQS::Client.new end end