lib/active_support/notifications/fanout.rb in activesupport-3.0.0.beta vs lib/active_support/notifications/fanout.rb in activesupport-3.0.0.beta2

- old
+ new

@@ -1,26 +1,38 @@ module ActiveSupport module Notifications # This is a default queue implementation that ships with Notifications. It - # just pushes events to all registered subscribers. + # just pushes events to all registered log subscribers. class Fanout def initialize @subscribers = [] + @listeners_for = {} end def bind(pattern) Binding.new(self, pattern) end def subscribe(pattern = nil, &block) + @listeners_for.clear @subscribers << Subscriber.new(pattern, &block) + @subscribers.last end - def publish(*args) - @subscribers.each { |s| s.publish(*args) } + def unsubscribe(subscriber) + @subscribers.delete(subscriber) + @listeners_for.clear end + def publish(name, *args) + if listeners = @listeners_for[name] + listeners.each { |s| s.publish(name, *args) } + else + @listeners_for[name] = @subscribers.select { |s| s.publish(name, *args) } + end + end + # This is a sync queue, so there is not waiting. def wait end # Used for internal implementation only. @@ -30,11 +42,11 @@ @pattern = case pattern when Regexp, NilClass pattern else - /^#{Regexp.escape(pattern.to_s)}/ + /^#{Regexp.escape(pattern.to_s)}$/ end end def subscribe(&block) @queue.subscribe(@pattern, &block) @@ -46,10 +58,12 @@ @pattern = pattern @block = block end def publish(*args) - push(*args) if matches?(args.first) + return unless matches?(args.first) + push(*args) + true end def drained? true end