Sha256: 10b3cddd3d51f5232ad5266f6c0cf2502b6f1dc4b3ff298a6f5908cb03deee82

Contents?: true

Size: 1.93 KB

Versions: 18

Compression:

Stored size: 1.93 KB

Contents

# ActiveJob docs: http://edgeguides.rubyonrails.org/active_job_basics.html
# Example adapters ref: https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters
module ActiveJob
  module QueueAdapters
    # == Shoryuken concurrent adapter for Active Job
    #
    # This adapter sends messages asynchronously (ie non-blocking) and allows
    # the caller to set up handlers for both success and failure
    #
    # To use this adapter, set up as:
    #
    # success_handler = ->(response, job, options) { StatsD.increment("#{job.class.name}.success") }
    # error_handler = ->(err, job, options) { StatsD.increment("#{job.class.name}.failure") }
    #
    # adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new(success_handler, error_handler)
    #
    # config.active_job.queue_adapter = adapter
    class ShoryukenConcurrentSendAdapter < ShoryukenAdapter
      def initialize(success_handler = nil, error_handler = nil)
        @success_handler = success_handler
        @error_handler = error_handler
      end

      def enqueue(job, options = {})
        send_concurrently(job, options) { |f_job, f_options| super(f_job, f_options) }
      end

      def success_handler
        @success_handler ||= ->(_send_message_response, _job, _options) { nil }
      end

      def error_handler
        @error_handler ||= begin
          lambda { |error, job, _options|
            Shoryuken.logger.warn("Failed to enqueue job: #{job.inspect} due to error: #{error}")
          }
        end
      end

      private

      def send_concurrently(job, options)
        Concurrent::Promises
          .future(job, options) { |f_job, f_options| [yield(f_job, f_options), f_job, f_options] }
          .then { |send_message_response, f_job, f_options| success_handler.call(send_message_response, f_job, f_options) }
          .rescue(job, options) { |err, f_job, f_options| error_handler.call(err, f_job, f_options) }
      end
    end
  end
end

Version data entries

18 entries across 18 versions & 1 rubygems

Version Path
shoryuken-6.2.1 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-6.2.0 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-6.1.1 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-6.1.0 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-6.0.0 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.3.2 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.3.1 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.3.0 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.2.3 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.2.2 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.2.1 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.2.0 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.1.1 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.1.0 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.0.6 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.0.5 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.0.4 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb
shoryuken-5.0.3 lib/shoryuken/extensions/active_job_concurrent_send_adapter.rb