Sha256: a5e1bd66ef996f677ea9642239607189efe8c77ecc366e6ca18e0a7543ad472f

Contents?: true

Size: 1.16 KB

Versions: 10

Compression:

Stored size: 1.16 KB

Contents

module ResqueBus
  # only process local queues
  class Local

    class << self
      def perform(attributes = {})
        ResqueBus.log_worker("Local running: #{attributes.inspect}")

        # looking for subscriptions, not queues
        subscription_matches(attributes).each do |sub|
           bus_attr = {"bus_driven_at" => Time.now.to_i, "bus_rider_queue" => sub.queue_name, "bus_rider_app_key" => sub.app_key, "bus_rider_sub_key" => sub.key, "bus_rider_class_name" => sub.class_name}
          to_publish = bus_attr.merge(attributes || {})
          if ResqueBus.local_mode == :standalone
            ResqueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr.merge(attributes || {}))
          # defaults to inline mode
          else ResqueBus.local_mode == :inline
            sub.execute!(to_publish)
          end
        end
      end

      # looking directly at subscriptions loaded into dispatcher
      # so we don't need redis server up
      def subscription_matches(attributes)
        out = []
        ResqueBus.dispatchers.each do |dispatcher|
          out.concat(dispatcher.subscription_matches(attributes))
        end
        out
      end
    end

  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
resque-bus-0.3.4 lib/resque_bus/local.rb
resque-bus-0.3.3 lib/resque_bus/local.rb
resque-bus-0.3.2 lib/resque_bus/local.rb
resque-bus-0.3.1 lib/resque_bus/local.rb
resque-bus-0.3.0 lib/resque_bus/local.rb
resque-bus-0.2.10 lib/resque_bus/local.rb
resque-bus-0.2.9 lib/resque_bus/local.rb
resque-bus-0.2.8 lib/resque_bus/local.rb
resque-bus-0.2.7 lib/resque_bus/local.rb
resque-bus-0.2.6 lib/resque_bus/local.rb