Sha256: 9b81b1ebe238924d06d1f48a4d6be13309f0417d90268ef09dd9d406e7eef107

Contents?: true

Size: 1.24 KB

Versions: 8

Compression:

Stored size: 1.24 KB

Contents

module Shoryuken
  class DefaultWorkerRegistry < WorkerRegistry
    def initialize
      @workers = {}
    end

    def batch_receive_messages?(queue)
      !!(@workers[queue] && @workers[queue].get_shoryuken_options['batch'])
    end

    def clear
      @workers.clear
    end

    def fetch_worker(queue, message)
      worker_class = !message.is_a?(Array) &&
        message.message_attributes &&
        message.message_attributes['shoryuken_class'] &&
        message.message_attributes['shoryuken_class'][:string_value]

      worker_class = (worker_class.constantize rescue nil) || @workers[queue]

      worker_class.new
    end

    def queues
      @workers.keys
    end

    def register_worker(queue, clazz)
      if (worker_class = @workers[queue])
        if worker_class.get_shoryuken_options['batch'] == true || clazz.get_shoryuken_options['batch'] == true
          raise ArgumentError, "Could not register #{clazz} for '#{queue}', "\
            "because #{worker_class} is already registered for this queue, "\
            "and Shoryuken doesn't support a batchable worker for a queue with multiple workers."
        end
      end

      @workers[queue] = clazz
    end

    def workers(queue)
      [@workers.fetch(queue, [])].flatten
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
shoryuken-2.0.4 lib/shoryuken/default_worker_registry.rb
shoryuken-2.0.3 lib/shoryuken/default_worker_registry.rb
shoryuken-2.0.2 lib/shoryuken/default_worker_registry.rb
shoryuken-2.0.1 lib/shoryuken/default_worker_registry.rb
shoryuken-2.0.0 lib/shoryuken/default_worker_registry.rb
shoryuken-1.0.3 lib/shoryuken/default_worker_registry.rb
shoryuken-1.0.2 lib/shoryuken/default_worker_registry.rb
shoryuken-1.0.1 lib/shoryuken/default_worker_registry.rb