lib/sqewer/connection.rb in sqewer-5.0.8 vs lib/sqewer/connection.rb in sqewer-5.0.9
- old
+ new
@@ -7,10 +7,11 @@
# * any execution that ends with an exception should cause the message to be re-enqueued
class Sqewer::Connection
DEFAULT_TIMEOUT_SECONDS = 5
BATCH_RECEIVE_SIZE = 10
MAX_RANDOM_FAILURES_PER_CALL = 10
+ MAX_RANDOM_RECEIVE_FAILURES = 100 # sure to hit the max_elapsed_time of 900 seconds
NotOurFaultAwsError = Class.new(StandardError)
# A wrapper for most important properties of the received message
class Message < Struct.new(:receipt_handle, :body)
@@ -37,18 +38,21 @@
def initialize(queue_url)
require 'aws-sdk'
@queue_url = queue_url
end
- # Receive at most 10 messages from the queue, and return the array of Message objects.
+ # Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at
+ # most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available
+ # even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to
+ # continue.
#
# @return [Array<Message>] an array of Message objects
def receive_messages
- response = client.receive_message(queue_url: @queue_url,
- wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10)
- response.messages.map do | message |
- Message.new(message.receipt_handle, message.body)
+ Retriable.retriable on: Seahorse::Client::NetworkingError, tries: MAX_RANDOM_RECEIVE_FAILURES do
+ response = client.receive_message(queue_url: @queue_url,
+ wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: BATCH_RECEIVE_SIZE)
+ response.messages.map {|message| Message.new(message.receipt_handle, message.body) }
end
end
# Send a message to the backing queue
#
@@ -123,11 +127,11 @@
end
private
def handle_batch_with_retries(method, batch)
- Retriable.retriable on: NotOurFaultAwsError, tries: MAX_RANDOM_FAILURES_PER_CALL do
+ Retriable.retriable on: [NotOurFaultAwsError, Seahorse::Client::NetworkingError], tries: MAX_RANDOM_FAILURES_PER_CALL do
resp = client.send(method, queue_url: @queue_url, entries: batch)
wrong_messages, aws_failures = resp.failed.partition {|m| m.sender_fault }
if wrong_messages.any?
err = wrong_messages.inspect + ', ' + resp.inspect
raise "#{wrong_messages.length} messages failed while doing #{method.to_s} with error: #{err}"
@@ -137,29 +141,9 @@
raise NotOurFaultAwsError
end
end
end
- class RetryWrapper < Struct.new(:sqs_client)
- MAX_RETRIES = 1000
- # Provide retrying wrappers for all the methods of Aws::SQS::Client that we actually use
- [:delete_message_batch, :send_message_batch, :receive_message].each do |retriable_method_name|
- define_method(retriable_method_name) do |*args, **kwargs|
- tries = 1
- begin
- sqs_client.public_send(retriable_method_name, *args, **kwargs)
- rescue Seahorse::Client::NetworkingError => e
- if (tries += 1) >= MAX_RETRIES
- raise(e)
- else
- sleep 0.5
- retry
- end
- end
- end
- end
- end
-
def client
- @client ||= RetryWrapper.new(Aws::SQS::Client.new)
+ @client ||= Aws::SQS::Client.new
end
end