lib/active_support/notifications/fanout.rb in activesupport-7.0.0.alpha2 vs lib/active_support/notifications/fanout.rb in activesupport-7.0.0.rc1
- old
+ new
@@ -5,10 +5,20 @@
require "set"
require "active_support/core_ext/object/try"
module ActiveSupport
module Notifications
+ class InstrumentationSubscriberError < RuntimeError
+ attr_reader :exceptions
+
+ def initialize(exceptions)
+ @exceptions = exceptions
+ exception_class_names = exceptions.map { |e| e.class.name }
+ super "Exception(s) occurred within instrumentation subscribers: #{exception_class_names.join(', ')}"
+ end
+ end
+
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# This class is thread safe. All methods are reentrant.
class Fanout
@@ -57,25 +67,46 @@
end
end
end
def start(name, id, payload)
- listeners_for(name).each { |s| s.start(name, id, payload) }
+ iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) }
end
def finish(name, id, payload, listeners = listeners_for(name))
- listeners.each { |s| s.finish(name, id, payload) }
+ iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
- listeners_for(name).each { |s| s.publish(name, *args) }
+ iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
def publish_event(event)
- listeners_for(event.name).each { |s| s.publish_event(event) }
+ iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
+ def iterate_guarding_exceptions(listeners)
+ exceptions = nil
+
+ listeners.each do |s|
+ yield s
+ rescue Exception => e
+ exceptions ||= []
+ exceptions << e
+ end
+
+ if exceptions
+ if exceptions.size == 1
+ raise exceptions.first
+ else
+ raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first
+ end
+ end
+
+ listeners
+ end
+
def listeners_for(name)
# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@listeners_for[name] || synchronize do
# use synchronisation when accessing @subscribers
@listeners_for[name] ||=
@@ -106,27 +137,24 @@
if procish.arity == 1 && procish.parameters.length == 1
subscriber_class = EventObject
end
end
- wrap_all pattern, subscriber_class.new(pattern, listener)
+ subscriber_class.new(pattern, listener)
end
- def self.wrap_all(pattern, subscriber)
- unless pattern
- AllMessages.new(subscriber)
- else
- subscriber
- end
- end
-
class Matcher # :nodoc:
attr_reader :pattern, :exclusions
def self.wrap(pattern)
- return pattern if String === pattern
- new(pattern)
+ if String === pattern
+ pattern
+ elsif pattern.nil?
+ AllMessages.new
+ else
+ new(pattern)
+ end
end
def initialize(pattern)
@pattern = pattern
@exclusions = Set.new
@@ -137,10 +165,20 @@
end
def ===(name)
pattern === name && !exclusions.include?(name)
end
+
+ class AllMessages
+ def ===(name)
+ true
+ end
+
+ def unsubscribe!(*)
+ false
+ end
+ end
end
class Evented # :nodoc:
attr_reader :pattern
@@ -175,14 +213,10 @@
def subscribed_to?(name)
pattern === name
end
- def matches?(name)
- pattern && pattern === name
- end
-
def unsubscribe!(name)
pattern.unsubscribe!(name)
end
end
@@ -190,16 +224,16 @@
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
- timestack = Thread.current[:_timestack] ||= []
+ timestack = IsolatedExecutionState[:_timestack] ||= []
timestack.push Time.now
end
def finish(name, id, payload)
- timestack = Thread.current[:_timestack]
+ timestack = IsolatedExecutionState[:_timestack]
started = timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end
@@ -207,31 +241,31 @@
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
- timestack = Thread.current[:_timestack_monotonic] ||= []
- timestack.push Concurrent.monotonic_time
+ timestack = IsolatedExecutionState[:_timestack_monotonic] ||= []
+ timestack.push Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def finish(name, id, payload)
- timestack = Thread.current[:_timestack_monotonic]
+ timestack = IsolatedExecutionState[:_timestack_monotonic]
started = timestack.pop
- @delegate.call(name, started, Concurrent.monotonic_time, id, payload)
+ @delegate.call(name, started, Process.clock_gettime(Process::CLOCK_MONOTONIC), id, payload)
end
end
class EventObject < Evented
def start(name, id, payload)
- stack = Thread.current[:_event_stack] ||= []
+ stack = IsolatedExecutionState[:_event_stack] ||= []
event = build_event name, id, payload
event.start!
stack.push event
end
def finish(name, id, payload)
- stack = Thread.current[:_event_stack]
+ stack = IsolatedExecutionState[:_event_stack]
event = stack.pop
event.payload = payload
event.finish!
@delegate.call event
end
@@ -242,37 +276,9 @@
private
def build_event(name, id, payload)
ActiveSupport::Notifications::Event.new name, nil, nil, id, payload
end
- end
-
- class AllMessages # :nodoc:
- def initialize(delegate)
- @delegate = delegate
- end
-
- def start(name, id, payload)
- @delegate.start name, id, payload
- end
-
- def finish(name, id, payload)
- @delegate.finish name, id, payload
- end
-
- def publish(name, *args)
- @delegate.publish name, *args
- end
-
- def subscribed_to?(name)
- true
- end
-
- def unsubscribe!(*)
- false
- end
-
- alias :matches? :===
end
end
end
end
end