Sha256: 75fad6e499e373f72e6605120520ce1fdb17e76a1d93d93ce09c5db13af4773e
Contents?: true
Size: 1.13 KB
Versions: 12
Compression:
Stored size: 1.13 KB
Contents
module QueueBus # fans out an event to multiple queues class Driver class << self def subscription_matches(attributes) out = [] Application.all.each do |app| subs = app.subscription_matches(attributes) out.concat(subs) end out end def perform(attributes={}) raise "No attributes passed" if attributes.empty? ::QueueBus.log_worker("Driver running: #{attributes.inspect}") subscription_matches(attributes).each do |sub| ::QueueBus.log_worker(" ...sending to #{sub.queue_name} queue with class #{sub.class_name} for app #{sub.app_key} because of subscription: #{sub.key}") bus_attr = { "bus_driven_at" => Time.now.to_i, "bus_rider_queue" => sub.queue_name, "bus_rider_app_key" => sub.app_key, "bus_rider_sub_key" => sub.key, "bus_rider_class_name" => sub.class_name} bus_attr = bus_attr.merge(attributes || {}) ::QueueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr) end end end end end
Version data entries
12 entries across 12 versions & 1 rubygems