lib/sqewer/connection.rb in sqewer-7.0.0 vs lib/sqewer/connection.rb in sqewer-8.0.0

- old
+ new

@@ -37,27 +37,40 @@ end rescue KeyError => e raise "SQS_QUEUE_URL not set in the environment. This is the queue URL Sqewer uses by default." end + # Returns a singleton of Aws::SQS::Client + def self.client + # It's better using a singleton client to prevent making a lot of HTTP + # requests to the AWS metadata endpoint when getting credentials. + @client ||= begin + require 'aws-sdk-sqs' + ::Aws::SQS::Client.new( + instance_profile_credentials_timeout: 1, + instance_profile_credentials_retries: 5, + ) + end + end + # Initializes a new adapter, with access to the SQS queue at the given URL. # # @param queue_url[String] the SQS queue URL (the URL can be copied from your AWS console) - def initialize(queue_url) - require 'aws-sdk-sqs' + def initialize(queue_url, client: self.class.client) @queue_url = queue_url + @client = client end # Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at # most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available # even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to # continue. # # @return [Array<Message>] an array of Message objects def receive_messages Retriable.retriable on: network_and_aws_sdk_errors, tries: MAX_RANDOM_RECEIVE_FAILURES do - response = client.receive_message( + response = @client.receive_message( queue_url: @queue_url, attribute_names: ['All'], wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: BATCH_RECEIVE_SIZE ) @@ -195,30 +208,18 @@ [NotOurFaultAwsError, Seahorse::Client::NetworkingError, Aws::SQS::Errors::InternalError] end def handle_batch_with_retries(method, batch) Retriable.retriable on: network_and_aws_sdk_errors, tries: MAX_RANDOM_FAILURES_PER_CALL do - resp = client.send(method, queue_url: @queue_url, entries: batch) + resp = @client.send(method, queue_url: @queue_url, entries: batch) wrong_messages, aws_failures = resp.failed.partition {|m| m.sender_fault } if wrong_messages.any? err = wrong_messages.inspect + ', ' + resp.inspect raise "#{wrong_messages.length} messages failed while doing #{method.to_s} with error: #{err}" elsif aws_failures.any? # We set the 'batch' param to an array with only the failed messages so only those get retried batch = aws_failures.map {|aws_response_message| batch.find { |m| aws_response_message.id.to_s == m[:id] }} raise NotOurFaultAwsError end end - end - - def client - # It's better using a singleton client to prevent making a lot of HTTP - # requests to the AWS metadata endpoint when getting credentials. - # Maybe in the future, we can remove @client and use Storm.client only. - return Sqewer.client if Sqewer.client - - @client ||= Aws::SQS::Client.new( - instance_profile_credentials_timeout: 1, # defaults to 1 second - instance_profile_credentials_retries: 5, # defaults to 0 retries - ) end end