lib/active_support/notifications/fanout.rb in activesupport-3.0.0.beta vs lib/active_support/notifications/fanout.rb in activesupport-3.0.0.beta2
- old
+ new
@@ -1,26 +1,38 @@
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications. It
- # just pushes events to all registered subscribers.
+ # just pushes events to all registered log subscribers.
class Fanout
def initialize
@subscribers = []
+ @listeners_for = {}
end
def bind(pattern)
Binding.new(self, pattern)
end
def subscribe(pattern = nil, &block)
+ @listeners_for.clear
@subscribers << Subscriber.new(pattern, &block)
+ @subscribers.last
end
- def publish(*args)
- @subscribers.each { |s| s.publish(*args) }
+ def unsubscribe(subscriber)
+ @subscribers.delete(subscriber)
+ @listeners_for.clear
end
+ def publish(name, *args)
+ if listeners = @listeners_for[name]
+ listeners.each { |s| s.publish(name, *args) }
+ else
+ @listeners_for[name] = @subscribers.select { |s| s.publish(name, *args) }
+ end
+ end
+
# This is a sync queue, so there is not waiting.
def wait
end
# Used for internal implementation only.
@@ -30,11 +42,11 @@
@pattern =
case pattern
when Regexp, NilClass
pattern
else
- /^#{Regexp.escape(pattern.to_s)}/
+ /^#{Regexp.escape(pattern.to_s)}$/
end
end
def subscribe(&block)
@queue.subscribe(@pattern, &block)
@@ -46,10 +58,12 @@
@pattern = pattern
@block = block
end
def publish(*args)
- push(*args) if matches?(args.first)
+ return unless matches?(args.first)
+ push(*args)
+ true
end
def drained?
true
end