Sha256: abd3742485afb0415e18618f9b2eeb72b345d3736270648b4bf41635a37c9e98

Contents?: true

Size: 1.98 KB

Versions: 5

Compression:

Stored size: 1.98 KB

Contents

# frozen_string_literal: true

require 'aws-sdk-sqs'

module ActiveJob
  module QueueAdapters

    class AmazonSqsAdapter

      def enqueue(job)
        _enqueue(job)
      end

      def enqueue_at(job, timestamp)
        delay = (timestamp - Time.now.to_f).floor
        raise ArgumentError, 'Unable to queue a job with a delay great than 15 minutes' if delay > 15.minutes
        _enqueue(job, nil, delay_seconds: delay)
      end

      private

      def _enqueue(job, body = nil, send_message_opts = {})
        body ||= job.serialize
        queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(job.queue_name)
        send_message_opts[:queue_url] = queue_url
        send_message_opts[:message_body] = Aws::Json.dump(body)
        send_message_opts[:message_attributes] = message_attributes(job)

        if Aws::Rails::SqsActiveJob.fifo?(queue_url)
          # job_id is unique per initialization of job
          # Remove it from message dup id to ensure run-once behavior
          # with ActiveJob retries
          send_message_opts[:message_deduplication_id] =
            Digest::SHA256.hexdigest(Aws::Json.dump(body.except('job_id')))

          message_group_id = job.message_group_id if job.respond_to?(:message_group_id)
          message_group_id ||= Aws::Rails::SqsActiveJob.config.message_group_id

          send_message_opts[:message_group_id] = message_group_id
        end

        Aws::Rails::SqsActiveJob.config.client.send_message(send_message_opts)
      end

      def message_attributes(job)
        {
          'aws_sqs_active_job_class' => {
            string_value: job.class.to_s,
            data_type: 'String'
          },
          'aws_sqs_active_job_version' => {
            string_value: Aws::Rails::VERSION,
            data_type: 'String'
          }
        }
      end
    end

    # create an alias to allow `:amazon` to be used as the adapter name
    # `:amazon` is the convention used for ActionMailer and ActiveStorage
    AmazonAdapter = AmazonSqsAdapter
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
aws-sdk-rails-3.8.0 lib/active_job/queue_adapters/amazon_sqs_adapter.rb
aws-sdk-rails-3.7.1 lib/active_job/queue_adapters/amazon_sqs_adapter.rb
aws-sdk-rails-3.7.0 lib/active_job/queue_adapters/amazon_sqs_adapter.rb
aws-sdk-rails-3.6.4 lib/active_job/queue_adapters/amazon_sqs_adapter.rb
aws-sdk-rails-3.6.3 lib/active_job/queue_adapters/amazon_sqs_adapter.rb