lib/sqewer/connection.rb in sqewer-4.0.1 vs lib/sqewer/connection.rb in sqewer-4.1.0

- old
+ new

@@ -53,18 +53,82 @@ # @param message_body[String] the message to send # @param kwargs_for_send[Hash] additional arguments for the submit (such as `delay_seconds`). # Passes the arguments to the AWS SDK. # @return [void] def send_message(message_body, **kwargs_for_send) + send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) } + end + + class MessageBuffer < Struct.new(:messages) + MAX_RECORDS = 10 + def initialize + super([]) + end + def each_batch + messages.each_slice(MAX_RECORDS){|batch| yield(batch)} + end + end + + class SendBuffer < MessageBuffer + def send_message(message_body, **kwargs_for_send) + # The "id" is only valid _within_ the request, and is used when + # an error response refers to a specific ID within a batch + m = {message_body: message_body, id: messages.length.to_s} + m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds] + messages << m + end + end + + class DeleteBuffer < MessageBuffer + def delete_message(receipt_handle) + # The "id" is only valid _within_ the request, and is used when + # an error response refers to a specific ID within a batch + m = {receipt_handle: receipt_handle, id: messages.length.to_s} + messages << m + end + end + + # Send multiple messages. If any messages fail to send, an exception will be raised. + # + # @yield [#send_message] the object you can send messages through (will be flushed at method return) + # @return [void] + def send_multiple_messages + buffer = SendBuffer.new + yield(buffer) client = ::Aws::SQS::Client.new - client.send_message(queue_url: @queue_url, message_body: message_body, **kwargs_for_send) + buffer.each_batch do | batch | + resp = client.send_message_batch(queue_url: @queue_url, entries: batch) + failed = resp.failed + if failed.any? + err = failed[0].message + raise "%d messages failed to send (first error was %s)" % [failed.length, err] + end + end end # Deletes a message after it has been succesfully decoded and processed # # @param message_identifier[String] the ID of the message to delete. For SQS, it is the receipt handle # @return [void] def delete_message(message_identifier) + delete_multiple_messages {|via| via.delete_message(message_identifier) } + end + + # Deletes multiple messages after they all have been succesfully decoded and processed. + # + # @yield [#delete_message] an object you can delete an individual message through + # @return [void] + def delete_multiple_messages + buffer = DeleteBuffer.new + yield(buffer) + client = ::Aws::SQS::Client.new - client.delete_message(queue_url: @queue_url, receipt_handle: message_identifier) + buffer.each_batch do | batch | + resp = client.delete_message_batch(queue_url: @queue_url, entries: batch) + failed = resp.failed + if failed.any? + err = failed[0].message + raise "%d messages failed to delete (first error was %s)" % [failed.length, err] + end + end end end