Sha256: 516d90c0075e981bb73d563b8280c55b89e42dd55ebf38cc70f8b21682ecb201

Contents?: true

Size: 1.24 KB

Versions: 5

Compression:

Stored size: 1.24 KB

Contents

module Shoryuken
  class DefaultWorkerRegistry < WorkerRegistry
    def initialize
      @workers = {}
    end

    def batch_receive_messages?(queue)
      !!(@workers[queue] && @workers[queue].get_shoryuken_options['batch'])
    end

    def clear
      @workers.clear
    end

    def fetch_worker(queue, message)
      worker_class = !message.is_a?(Array) &&
        message.message_attributes &&
        message.message_attributes['shoryuken_class'] &&
        message.message_attributes['shoryuken_class'][:string_value]

      worker_class = (worker_class.constantize rescue nil) || @workers[queue]

      worker_class.new
    end

    def queues
      @workers.keys
    end

    def register_worker(queue, clazz)
      if (worker_class = @workers[queue])
        if worker_class.get_shoryuken_options['batch'] == true || clazz.get_shoryuken_options['batch'] == true
          fail ArgumentError, "Could not register #{clazz} for '#{queue}', "\
            "because #{worker_class} is already registered for this queue, "\
            "and Shoryuken doesn't support a batchable worker for a queue with multiple workers"
        end
      end

      @workers[queue] = clazz
    end

    def workers(queue)
      [@workers.fetch(queue, [])].flatten
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
shoryuken-2.1.3 lib/shoryuken/default_worker_registry.rb
shoryuken-2.1.2 lib/shoryuken/default_worker_registry.rb
shoryuken-2.1.1 lib/shoryuken/default_worker_registry.rb
shoryuken-2.1.0 lib/shoryuken/default_worker_registry.rb
shoryuken-2.0.11 lib/shoryuken/default_worker_registry.rb