lib/sqewer/connection.rb in sqewer-4.0.1 vs lib/sqewer/connection.rb in sqewer-4.1.0
- old
+ new
@@ -53,18 +53,82 @@
# @param message_body[String] the message to send
# @param kwargs_for_send[Hash] additional arguments for the submit (such as `delay_seconds`).
# Passes the arguments to the AWS SDK.
# @return [void]
def send_message(message_body, **kwargs_for_send)
+ send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
+ end
+
+ class MessageBuffer < Struct.new(:messages)
+ MAX_RECORDS = 10
+ def initialize
+ super([])
+ end
+ def each_batch
+ messages.each_slice(MAX_RECORDS){|batch| yield(batch)}
+ end
+ end
+
+ class SendBuffer < MessageBuffer
+ def send_message(message_body, **kwargs_for_send)
+ # The "id" is only valid _within_ the request, and is used when
+ # an error response refers to a specific ID within a batch
+ m = {message_body: message_body, id: messages.length.to_s}
+ m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds]
+ messages << m
+ end
+ end
+
+ class DeleteBuffer < MessageBuffer
+ def delete_message(receipt_handle)
+ # The "id" is only valid _within_ the request, and is used when
+ # an error response refers to a specific ID within a batch
+ m = {receipt_handle: receipt_handle, id: messages.length.to_s}
+ messages << m
+ end
+ end
+
+ # Send multiple messages. If any messages fail to send, an exception will be raised.
+ #
+ # @yield [#send_message] the object you can send messages through (will be flushed at method return)
+ # @return [void]
+ def send_multiple_messages
+ buffer = SendBuffer.new
+ yield(buffer)
client = ::Aws::SQS::Client.new
- client.send_message(queue_url: @queue_url, message_body: message_body, **kwargs_for_send)
+ buffer.each_batch do | batch |
+ resp = client.send_message_batch(queue_url: @queue_url, entries: batch)
+ failed = resp.failed
+ if failed.any?
+ err = failed[0].message
+ raise "%d messages failed to send (first error was %s)" % [failed.length, err]
+ end
+ end
end
# Deletes a message after it has been succesfully decoded and processed
#
# @param message_identifier[String] the ID of the message to delete. For SQS, it is the receipt handle
# @return [void]
def delete_message(message_identifier)
+ delete_multiple_messages {|via| via.delete_message(message_identifier) }
+ end
+
+ # Deletes multiple messages after they all have been succesfully decoded and processed.
+ #
+ # @yield [#delete_message] an object you can delete an individual message through
+ # @return [void]
+ def delete_multiple_messages
+ buffer = DeleteBuffer.new
+ yield(buffer)
+
client = ::Aws::SQS::Client.new
- client.delete_message(queue_url: @queue_url, receipt_handle: message_identifier)
+ buffer.each_batch do | batch |
+ resp = client.delete_message_batch(queue_url: @queue_url, entries: batch)
+ failed = resp.failed
+ if failed.any?
+ err = failed[0].message
+ raise "%d messages failed to delete (first error was %s)" % [failed.length, err]
+ end
+ end
end
end