lib/sqewer/connection.rb in sqewer-5.0.0 vs lib/sqewer/connection.rb in sqewer-5.0.1
- old
+ new
@@ -6,91 +6,91 @@
# * no message should be deleted if the receiving client has not deleted it explicitly
# * any execution that ends with an exception should cause the message to be re-enqueued
class Sqewer::Connection
DEFAULT_TIMEOUT_SECONDS = 5
BATCH_RECEIVE_SIZE = 10
-
+
# A wrapper for most important properties of the received message
class Message < Struct.new(:receipt_handle, :body)
def inspect
body.inspect
end
-
+
def has_body?
body && !body.empty?
end
end
-
+
# Returns the default adapter, connected to the queue set via the `SQS_QUEUE_URL`
# environment variable.
def self.default
new(ENV.fetch('SQS_QUEUE_URL'))
rescue KeyError => e
raise "SQS_QUEUE_URL not set in the environment. This is the queue URL that the default that Sqewer uses"
end
-
+
# Initializes a new adapter, with access to the SQS queue at the given URL.
#
# @param queue_url[String] the SQS queue URL (the URL can be copied from your AWS console)
def initialize(queue_url)
require 'aws-sdk'
@queue_url = queue_url
end
-
+
# Receive at most 10 messages from the queue, and return the array of Message objects.
#
# @return [Array<Message>] an array of Message objects
def receive_messages
response = client.receive_message(queue_url: @queue_url,
wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10)
response.messages.map do | message |
Message.new(message.receipt_handle, message.body)
end
end
-
+
# Send a message to the backing queue
#
# @param message_body[String] the message to send
# @param kwargs_for_send[Hash] additional arguments for the submit (such as `delay_seconds`).
# Passes the arguments to the AWS SDK.
# @return [void]
def send_message(message_body, **kwargs_for_send)
send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
end
-
+
# Stores the messages for the SQS queue (both deletes and sends), and yields them in allowed batch sizes
class MessageBuffer < Struct.new(:messages)
MAX_RECORDS = 10
def initialize
super([])
end
def each_batch
messages.each_slice(MAX_RECORDS){|batch| yield(batch)}
end
end
-
+
# Saves the messages to send to the SQS queue
class SendBuffer < MessageBuffer
def send_message(message_body, **kwargs_for_send)
# The "id" is only valid _within_ the request, and is used when
# an error response refers to a specific ID within a batch
m = {message_body: message_body, id: messages.length.to_s}
m[:delay_seconds] = kwargs_for_send[:delay_seconds] if kwargs_for_send[:delay_seconds]
messages << m
end
end
-
+
# Saves the receipt handles to batch-delete from the SQS queue
class DeleteBuffer < MessageBuffer
def delete_message(receipt_handle)
# The "id" is only valid _within_ the request, and is used when
# an error response refers to a specific ID within a batch
m = {receipt_handle: receipt_handle, id: messages.length.to_s}
messages << m
end
end
-
+
# Send multiple messages. If any messages fail to send, an exception will be raised.
#
# @yield [#send_message] the object you can send messages through (will be flushed at method return)
# @return [void]
def send_multiple_messages
@@ -103,39 +103,39 @@
err = failed[0].message
raise "%d messages failed to send (first error was %s)" % [failed.length, err]
end
end
end
-
+
# Deletes a message after it has been succesfully decoded and processed
#
# @param message_identifier[String] the ID of the message to delete. For SQS, it is the receipt handle
# @return [void]
def delete_message(message_identifier)
delete_multiple_messages {|via| via.delete_message(message_identifier) }
end
-
+
# Deletes multiple messages after they all have been succesfully decoded and processed.
#
# @yield [#delete_message] an object you can delete an individual message through
# @return [void]
def delete_multiple_messages
buffer = DeleteBuffer.new
yield(buffer)
-
+
buffer.each_batch do | batch |
resp = client.delete_message_batch(queue_url: @queue_url, entries: batch)
failed = resp.failed
if failed.any?
err = failed[0].message
raise "%d messages failed to delete (first error was %s)" % [failed.length, err]
end
end
end
-
+
private
-
+
class RetryWrapper < Struct.new(:sqs_client)
MAX_RETRIES = 1000
# Provide retrying wrappers for all the methods of Aws::SQS::Client that we actually use
[:delete_message_batch, :send_message_batch, :receive_message].each do |retriable_method_name|
define_method(retriable_method_name) do |*args, **kwargs|
@@ -151,10 +151,10 @@
end
end
end
end
end
-
+
def client
@client ||= RetryWrapper.new(Aws::SQS::Client.new)
end
end