Sha256: 75fad6e499e373f72e6605120520ce1fdb17e76a1d93d93ce09c5db13af4773e

Contents?: true

Size: 1.13 KB

Versions: 12

Compression:

Stored size: 1.13 KB

Contents

module QueueBus
  # fans out an event to multiple queues
  class Driver

    class << self
      def subscription_matches(attributes)
        out = []
        Application.all.each do |app|
          subs = app.subscription_matches(attributes)
          out.concat(subs)
        end
        out
      end

      def perform(attributes={})
        raise "No attributes passed" if attributes.empty?

        ::QueueBus.log_worker("Driver running: #{attributes.inspect}")

        subscription_matches(attributes).each do |sub|
          ::QueueBus.log_worker("  ...sending to #{sub.queue_name} queue with class #{sub.class_name} for app #{sub.app_key} because of subscription: #{sub.key}")

          bus_attr = {  "bus_driven_at" => Time.now.to_i,
                        "bus_rider_queue" => sub.queue_name,
                        "bus_rider_app_key" => sub.app_key,
                        "bus_rider_sub_key" => sub.key,
                        "bus_rider_class_name" => sub.class_name}
          bus_attr = bus_attr.merge(attributes || {})
          ::QueueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr)
        end
      end
    end

  end
end

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
queue-bus-0.9.0 lib/queue_bus/driver.rb
queue-bus-0.8.1 lib/queue_bus/driver.rb
queue-bus-0.8.0 lib/queue_bus/driver.rb
queue-bus-0.7.0 lib/queue_bus/driver.rb
queue-bus-0.6.0 lib/queue_bus/driver.rb
queue-bus-0.5.9 lib/queue_bus/driver.rb
queue-bus-0.5.8 lib/queue_bus/driver.rb
queue-bus-0.5.7 lib/queue_bus/driver.rb
queue-bus-0.5.6 lib/queue_bus/driver.rb
queue-bus-0.5.5 lib/queue_bus/driver.rb
queue-bus-0.5.4 lib/queue_bus/driver.rb
queue-bus-0.5.3 lib/queue_bus/driver.rb