lib/sqewer/connection.rb in sqewer-4.1.0 vs lib/sqewer/connection.rb in sqewer-4.2.0

- old
+ new

@@ -38,11 +38,10 @@ # Receive at most 10 messages from the queue, and return the array of Message objects. # # @return [Array<Message>] an array of Message objects def receive_messages - client = ::Aws::SQS::Client.new response = client.receive_message(queue_url: @queue_url, wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10) response.messages.map do | message | Message.new(message.receipt_handle, message.body) end @@ -56,30 +55,33 @@ # @return [void] def send_message(message_body, **kwargs_for_send) send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) } end + # Stores the messages for the SQS queue (both deletes and sends), and yields them in allowed batch sizes class MessageBuffer < Struct.new(:messages) MAX_RECORDS = 10 def initialize super([]) end def each_batch messages.each_slice(MAX_RECORDS){|batch| yield(batch)} end end + # Saves the messages to send to the SQS queue class SendBuffer < MessageBuffer def send_message(message_body, **kwargs_for_send) # The "id" is only valid _within_ the request, and is used when # an error response refers to a specific ID within a batch m = {message_body: message_body, id: messages.length.to_s} m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds] messages << m end end + # Saves the receipt handles to batch-delete from the SQS queue class DeleteBuffer < MessageBuffer def delete_message(receipt_handle) # The "id" is only valid _within_ the request, and is used when # an error response refers to a specific ID within a batch m = {receipt_handle: receipt_handle, id: messages.length.to_s} @@ -92,11 +94,10 @@ # @yield [#send_message] the object you can send messages through (will be flushed at method return) # @return [void] def send_multiple_messages buffer = SendBuffer.new yield(buffer) - client = ::Aws::SQS::Client.new buffer.each_batch do | batch | resp = client.send_message_batch(queue_url: @queue_url, entries: batch) failed = resp.failed if failed.any? err = failed[0].message @@ -119,16 +120,41 @@ # @return [void] def delete_multiple_messages buffer = DeleteBuffer.new yield(buffer) - client = ::Aws::SQS::Client.new buffer.each_batch do | batch | resp = client.delete_message_batch(queue_url: @queue_url, entries: batch) failed = resp.failed if failed.any? err = failed[0].message raise "%d messages failed to delete (first error was %s)" % [failed.length, err] end end + end + + private + + class RetryWrapper < Struct.new(:sqs_client) + MAX_RETRIES = 1000 + # Provide retrying wrappers for all the methods of Aws::SQS::Client that we actually use + [:delete_message_batch, :send_message_batch, :receive_message].each do |retriable_method_name| + define_method(retriable_method_name) do |*args, **kwargs| + tries = 1 + begin + sqs_client.public_send(retriable_method_name, *args, **kwargs) + rescue Seahorse::Client::NetworkingError => e + if (tries += 1) >= MAX_RETRIES + raise(e) + else + sleep 0.5 + retry + end + end + end + end + end + + def client + @client ||= RetryWrapper.new(Aws::SQS::Client.new) end end