lib/sqewer/connection.rb in sqewer-1.0.0 vs lib/sqewer/connection.rb in sqewer-2.0.0
- old
+ new
@@ -7,10 +7,21 @@
# * any execution that ends with an exception should cause the message to be re-enqueued
class Sqewer::Connection
DEFAULT_TIMEOUT_SECONDS = 5
BATCH_RECEIVE_SIZE = 10
+ # A wrapper for most important properties of the received message
+ class Message < Struct.new(:receipt_handle, :body)
+ def inspect
+ body.inspect
+ end
+
+ def has_body?
+ body && !body.empty?
+ end
+ end
+
# Returns the default adapter, connected to the queue set via the `SQS_QUEUE_URL`
# environment variable.
def self.default
new(ENV.fetch('SQS_QUEUE_URL'))
rescue KeyError => e
@@ -23,25 +34,17 @@
def initialize(queue_url)
require 'aws-sdk'
@queue_url = queue_url
end
- # Poll for messages, and return if no records are received within the given period.
+ # Receive at most 10 messages from the queue, and return the array of Message objects.
#
- # @param timeout[Fixnum] the number of seconds to wait before returning if no messages appear on the queue
- # @yield [String, String] the receipt identifier and contents of the message body
- # @return [void]
- def poll(timeout = DEFAULT_TIMEOUT_SECONDS)
- poller = ::Aws::SQS::QueuePoller.new(@queue_url)
- # SDK v2 automatically deletes messages if the block returns normally, but we want it to happen manually
- # from the caller.
- poller.poll(max_number_of_messages: BATCH_RECEIVE_SIZE, skip_delete: true,
- idle_timeout: timeout.to_i, wait_time_seconds: timeout.to_i) do | sqs_messages |
-
- sqs_messages.each do | sqs_message |
- yield [sqs_message.receipt_handle, sqs_message.body]
- end
-
+ # @return [Array<Message>] an array of Message objects
+ def receive_messages
+ client = ::Aws::SQS::Client.new
+ response = client.receive_message(queue_url: @queue_url, wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10)
+ response.messages.map do | message |
+ Message.new(message.receipt_handle, message.body)
end
end
# Send a message to the backing queue
#