Sha256: 7b26af1beb69b71553ecf050564f3892e034e1070ac296ee29717ba50573106f

Contents?: true

Size: 1.93 KB

Versions: 7

Compression:

Stored size: 1.93 KB

Contents

# ActiveJob docs: http://edgeguides.rubyonrails.org/active_job_basics.html
# Example adapters ref: https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters

require 'shoryuken'

module ActiveJob
  module QueueAdapters
    # == Shoryuken adapter for Active Job
    #
    # Shoryuken ("sho-ryu-ken") is a super-efficient AWS SQS thread based message processor.
    #
    # Read more about Shoryuken {here}[https://github.com/phstc/shoryuken].
    #
    # To use Shoryuken set the queue_adapter config to +:shoryuken+.
    #
    #   Rails.application.config.active_job.queue_adapter = :shoryuken
    class ShoryukenAdapter
      class << self
        def enqueue(job) #:nodoc:
          register_worker!(job)

          queue = Shoryuken::Client.queues(job.queue_name)
          queue.send_message(message(job))
        end

        def enqueue_at(job, timestamp) #:nodoc:
          register_worker!(job)

          delay = (timestamp - Time.current.to_f).round
          raise 'The maximum allowed delay is 15 minutes' if delay > 15.minutes

          queue = Shoryuken::Client.queues(job.queue_name)
          queue.send_message(message(job, delay_seconds: delay))
        end

        private

        def message(job, options = {})
          body = job.serialize

          { message_body: body,
            message_attributes: message_attributes }.merge(options)
        end

        def register_worker!(job)
          Shoryuken.register_worker(job.queue_name, JobWrapper)
        end

        def message_attributes
          @message_attributes ||= {
            'shoryuken_class' => {
              string_value: JobWrapper.to_s,
              data_type: 'String'
            }
          }
        end
      end

      class JobWrapper #:nodoc:
        include Shoryuken::Worker

        shoryuken_options body_parser: :json, auto_delete: true

        def perform(sqs_msg, hash)
          Base.execute hash
        end
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
shoryuken-2.0.3 lib/shoryuken/extensions/active_job_adapter.rb
shoryuken-2.0.2 lib/shoryuken/extensions/active_job_adapter.rb
shoryuken-2.0.1 lib/shoryuken/extensions/active_job_adapter.rb
shoryuken-2.0.0 lib/shoryuken/extensions/active_job_adapter.rb
shoryuken-1.0.3 lib/shoryuken/extensions/active_job_adapter.rb
shoryuken-1.0.2 lib/shoryuken/extensions/active_job_adapter.rb
shoryuken-1.0.1 lib/shoryuken/extensions/active_job_adapter.rb