lib/active_support/notifications/fanout.rb in activesupport-4.2.11.3 vs lib/active_support/notifications/fanout.rb in activesupport-5.0.0.beta1
- old
+ new
@@ -1,7 +1,7 @@
require 'mutex_m'
-require 'thread_safe'
+require 'concurrent/map'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
@@ -10,11 +10,11 @@
class Fanout
include Mutex_m
def initialize
@subscribers = []
- @listeners_for = ThreadSafe::Cache.new
+ @listeners_for = Concurrent::Map.new
super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
@@ -40,19 +40,19 @@
def start(name, id, payload)
listeners_for(name).each { |s| s.start(name, id, payload) }
end
- def finish(name, id, payload)
- listeners_for(name).each { |s| s.finish(name, id, payload) }
+ def finish(name, id, payload, listeners = listeners_for(name))
+ listeners.each { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
def listeners_for(name)
- # this is correctly done double-checked locking (ThreadSafe::Cache's lookups have volatile semantics)
+ # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@listeners_for[name] || synchronize do
# use synchronisation when accessing @subscribers
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end