lib/sqewer/connection.rb in sqewer-1.0.0 vs lib/sqewer/connection.rb in sqewer-2.0.0

- old
+ new

@@ -7,10 +7,21 @@ # * any execution that ends with an exception should cause the message to be re-enqueued class Sqewer::Connection DEFAULT_TIMEOUT_SECONDS = 5 BATCH_RECEIVE_SIZE = 10 + # A wrapper for most important properties of the received message + class Message < Struct.new(:receipt_handle, :body) + def inspect + body.inspect + end + + def has_body? + body && !body.empty? + end + end + # Returns the default adapter, connected to the queue set via the `SQS_QUEUE_URL` # environment variable. def self.default new(ENV.fetch('SQS_QUEUE_URL')) rescue KeyError => e @@ -23,25 +34,17 @@ def initialize(queue_url) require 'aws-sdk' @queue_url = queue_url end - # Poll for messages, and return if no records are received within the given period. + # Receive at most 10 messages from the queue, and return the array of Message objects. # - # @param timeout[Fixnum] the number of seconds to wait before returning if no messages appear on the queue - # @yield [String, String] the receipt identifier and contents of the message body - # @return [void] - def poll(timeout = DEFAULT_TIMEOUT_SECONDS) - poller = ::Aws::SQS::QueuePoller.new(@queue_url) - # SDK v2 automatically deletes messages if the block returns normally, but we want it to happen manually - # from the caller. - poller.poll(max_number_of_messages: BATCH_RECEIVE_SIZE, skip_delete: true, - idle_timeout: timeout.to_i, wait_time_seconds: timeout.to_i) do | sqs_messages | - - sqs_messages.each do | sqs_message | - yield [sqs_message.receipt_handle, sqs_message.body] - end - + # @return [Array<Message>] an array of Message objects + def receive_messages + client = ::Aws::SQS::Client.new + response = client.receive_message(queue_url: @queue_url, wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10) + response.messages.map do | message | + Message.new(message.receipt_handle, message.body) end end # Send a message to the backing queue #