Sha256: 5b74518be9c39fa158342945ec2c75fbd9106b8a82b170bd63b34a60bf023946
Contents?: true
Size: 1.38 KB
Versions: 4
Compression:
Stored size: 1.38 KB
Contents
module QueueBus # only process local queues class Local class << self def publish(attributes = {}) if ::QueueBus.local_mode == :suppress ::QueueBus.log_worker("Suppressed: #{attributes.inspect}") return # not doing anything end ::QueueBus.log_worker("Local running: #{attributes.inspect}") # looking for subscriptions, not queues subscription_matches(attributes).each do |sub| bus_attr = { "bus_driven_at" => Time.now.to_i, "bus_rider_queue" => sub.queue_name, "bus_rider_app_key" => sub.app_key, "bus_rider_sub_key" => sub.key, "bus_rider_class_name" => sub.class_name} to_publish = bus_attr.merge(attributes || {}) if ::QueueBus.local_mode == :standalone ::QueueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr.merge(attributes || {})) else # defaults to inline mode sub.execute!(to_publish) end end end # looking directly at subscriptions loaded into dispatcher # so we don't need redis server up def subscription_matches(attributes) out = [] ::QueueBus.dispatchers.each do |dispatcher| out.concat(dispatcher.subscription_matches(attributes)) end out end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
queue-bus-0.5.6 | lib/queue_bus/local.rb |
queue-bus-0.5.5 | lib/queue_bus/local.rb |
queue-bus-0.5.4 | lib/queue_bus/local.rb |
queue-bus-0.5.3 | lib/queue_bus/local.rb |