lib/sqewer/connection.rb in sqewer-7.0.0 vs lib/sqewer/connection.rb in sqewer-8.0.0
- old
+ new
@@ -37,27 +37,40 @@
end
rescue KeyError => e
raise "SQS_QUEUE_URL not set in the environment. This is the queue URL Sqewer uses by default."
end
+ # Returns a singleton of Aws::SQS::Client
+ def self.client
+ # It's better using a singleton client to prevent making a lot of HTTP
+ # requests to the AWS metadata endpoint when getting credentials.
+ @client ||= begin
+ require 'aws-sdk-sqs'
+ ::Aws::SQS::Client.new(
+ instance_profile_credentials_timeout: 1,
+ instance_profile_credentials_retries: 5,
+ )
+ end
+ end
+
# Initializes a new adapter, with access to the SQS queue at the given URL.
#
# @param queue_url[String] the SQS queue URL (the URL can be copied from your AWS console)
- def initialize(queue_url)
- require 'aws-sdk-sqs'
+ def initialize(queue_url, client: self.class.client)
@queue_url = queue_url
+ @client = client
end
# Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at
# most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available
# even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to
# continue.
#
# @return [Array<Message>] an array of Message objects
def receive_messages
Retriable.retriable on: network_and_aws_sdk_errors, tries: MAX_RANDOM_RECEIVE_FAILURES do
- response = client.receive_message(
+ response = @client.receive_message(
queue_url: @queue_url,
attribute_names: ['All'],
wait_time_seconds: DEFAULT_TIMEOUT_SECONDS,
max_number_of_messages: BATCH_RECEIVE_SIZE
)
@@ -195,30 +208,18 @@
[NotOurFaultAwsError, Seahorse::Client::NetworkingError, Aws::SQS::Errors::InternalError]
end
def handle_batch_with_retries(method, batch)
Retriable.retriable on: network_and_aws_sdk_errors, tries: MAX_RANDOM_FAILURES_PER_CALL do
- resp = client.send(method, queue_url: @queue_url, entries: batch)
+ resp = @client.send(method, queue_url: @queue_url, entries: batch)
wrong_messages, aws_failures = resp.failed.partition {|m| m.sender_fault }
if wrong_messages.any?
err = wrong_messages.inspect + ', ' + resp.inspect
raise "#{wrong_messages.length} messages failed while doing #{method.to_s} with error: #{err}"
elsif aws_failures.any?
# We set the 'batch' param to an array with only the failed messages so only those get retried
batch = aws_failures.map {|aws_response_message| batch.find { |m| aws_response_message.id.to_s == m[:id] }}
raise NotOurFaultAwsError
end
end
- end
-
- def client
- # It's better using a singleton client to prevent making a lot of HTTP
- # requests to the AWS metadata endpoint when getting credentials.
- # Maybe in the future, we can remove @client and use Storm.client only.
- return Sqewer.client if Sqewer.client
-
- @client ||= Aws::SQS::Client.new(
- instance_profile_credentials_timeout: 1, # defaults to 1 second
- instance_profile_credentials_retries: 5, # defaults to 0 retries
- )
end
end