Sha256: 21427f17809155a1ec9fbe98f781cfc3910fb9145b31dfeea9d809e49bdcdfae

Contents?: true

Size: 987 Bytes

Versions: 2

Compression:

Stored size: 987 Bytes

Contents

module ResqueBus
  # fans out an event to multiple queues
  class Driver

    def self.subscription_matches(attributes)
      out = []
      Application.all.each do |app|
        subs = app.subscription_matches(attributes)
        out.concat(subs)
      end
      out
    end

    def self.perform(attributes={})
      raise "No attribiutes passed" if attributes.empty?

      ResqueBus.log_worker("Driver running: #{attributes.inspect}")

      subscription_matches(attributes).each do |sub|
        ResqueBus.log_worker("  ...sending to #{sub.queue_name} queue with class #{sub.class_name} for app #{sub.app_key} because of subscription: #{sub.key}")
        
        bus_attr = {"bus_driven_at" => Time.now.to_i, "bus_rider_queue" => sub.queue_name, "bus_rider_app_key" => sub.app_key, "bus_rider_sub_key" => sub.key, "bus_rider_class_name" => sub.class_name}
        ResqueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr.merge(attributes || {}))
      end
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
resque-bus-0.2.4 lib/resque_bus/driver.rb
resque-bus-0.2.3 lib/resque_bus/driver.rb