lib/sqewer/connection.rb in sqewer-4.1.0 vs lib/sqewer/connection.rb in sqewer-4.2.0
- old
+ new
@@ -38,11 +38,10 @@
# Receive at most 10 messages from the queue, and return the array of Message objects.
#
# @return [Array<Message>] an array of Message objects
def receive_messages
- client = ::Aws::SQS::Client.new
response = client.receive_message(queue_url: @queue_url,
wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10)
response.messages.map do | message |
Message.new(message.receipt_handle, message.body)
end
@@ -56,30 +55,33 @@
# @return [void]
def send_message(message_body, **kwargs_for_send)
send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
end
+ # Stores the messages for the SQS queue (both deletes and sends), and yields them in allowed batch sizes
class MessageBuffer < Struct.new(:messages)
MAX_RECORDS = 10
def initialize
super([])
end
def each_batch
messages.each_slice(MAX_RECORDS){|batch| yield(batch)}
end
end
+ # Saves the messages to send to the SQS queue
class SendBuffer < MessageBuffer
def send_message(message_body, **kwargs_for_send)
# The "id" is only valid _within_ the request, and is used when
# an error response refers to a specific ID within a batch
m = {message_body: message_body, id: messages.length.to_s}
m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds]
messages << m
end
end
+ # Saves the receipt handles to batch-delete from the SQS queue
class DeleteBuffer < MessageBuffer
def delete_message(receipt_handle)
# The "id" is only valid _within_ the request, and is used when
# an error response refers to a specific ID within a batch
m = {receipt_handle: receipt_handle, id: messages.length.to_s}
@@ -92,11 +94,10 @@
# @yield [#send_message] the object you can send messages through (will be flushed at method return)
# @return [void]
def send_multiple_messages
buffer = SendBuffer.new
yield(buffer)
- client = ::Aws::SQS::Client.new
buffer.each_batch do | batch |
resp = client.send_message_batch(queue_url: @queue_url, entries: batch)
failed = resp.failed
if failed.any?
err = failed[0].message
@@ -119,16 +120,41 @@
# @return [void]
def delete_multiple_messages
buffer = DeleteBuffer.new
yield(buffer)
- client = ::Aws::SQS::Client.new
buffer.each_batch do | batch |
resp = client.delete_message_batch(queue_url: @queue_url, entries: batch)
failed = resp.failed
if failed.any?
err = failed[0].message
raise "%d messages failed to delete (first error was %s)" % [failed.length, err]
end
end
+ end
+
+ private
+
+ class RetryWrapper < Struct.new(:sqs_client)
+ MAX_RETRIES = 1000
+ # Provide retrying wrappers for all the methods of Aws::SQS::Client that we actually use
+ [:delete_message_batch, :send_message_batch, :receive_message].each do |retriable_method_name|
+ define_method(retriable_method_name) do |*args, **kwargs|
+ tries = 1
+ begin
+ sqs_client.public_send(retriable_method_name, *args, **kwargs)
+ rescue Seahorse::Client::NetworkingError => e
+ if (tries += 1) >= MAX_RETRIES
+ raise(e)
+ else
+ sleep 0.5
+ retry
+ end
+ end
+ end
+ end
+ end
+
+ def client
+ @client ||= RetryWrapper.new(Aws::SQS::Client.new)
end
end