Sha256: d862e1eb8abe9ae088cefed1f6a5c7af0fffa4cf3944687de98386b8c0243394

Contents?: true

Size: 1.1 KB

Versions: 2

Compression:

Stored size: 1.1 KB

Contents

module ResqueBus
  # only process local queues
  class Local

    def self.perform(attributes = {})
      ResqueBus.log_worker("Local running: #{attributes.inspect}")

      # looking for subscriptions, not queues
      subscription_matches(attributes).each do |sub|
         bus_attr = {"bus_driven_at" => Time.now.to_i, "bus_rider_queue" => sub.queue_name, "bus_rider_app_key" => sub.app_key, "bus_rider_sub_key" => sub.key, "bus_rider_class_name" => sub.class_name}
        to_publish = bus_attr.merge(attributes || {})
        if ResqueBus.local_mode == :standalone
          ResqueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr.merge(attributes || {}))
        # defaults to inline mode
        else ResqueBus.local_mode == :inline
          sub.execute!(to_publish)
        end
      end
    end

    # looking directly at subscriptions loaded into dispatcher
    # so we don't need redis server up
    def self.subscription_matches(attributes)
      out = []
      ResqueBus.dispatchers.each do |dispatcher|
        out.concat(dispatcher.subscription_matches(attributes))
      end
      out
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
resque-bus-0.2.4 lib/resque_bus/local.rb
resque-bus-0.2.3 lib/resque_bus/local.rb