lib/active_support/notifications/fanout.rb in activesupport-7.0.0.alpha2 vs lib/active_support/notifications/fanout.rb in activesupport-7.0.0.rc1

- old
+ new

@@ -5,10 +5,20 @@ require "set" require "active_support/core_ext/object/try" module ActiveSupport module Notifications + class InstrumentationSubscriberError < RuntimeError + attr_reader :exceptions + + def initialize(exceptions) + @exceptions = exceptions + exception_class_names = exceptions.map { |e| e.class.name } + super "Exception(s) occurred within instrumentation subscribers: #{exception_class_names.join(', ')}" + end + end + # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. # # This class is thread safe. All methods are reentrant. class Fanout @@ -57,25 +67,46 @@ end end end def start(name, id, payload) - listeners_for(name).each { |s| s.start(name, id, payload) } + iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) } end def finish(name, id, payload, listeners = listeners_for(name)) - listeners.each { |s| s.finish(name, id, payload) } + iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) } end def publish(name, *args) - listeners_for(name).each { |s| s.publish(name, *args) } + iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) } end def publish_event(event) - listeners_for(event.name).each { |s| s.publish_event(event) } + iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) } end + def iterate_guarding_exceptions(listeners) + exceptions = nil + + listeners.each do |s| + yield s + rescue Exception => e + exceptions ||= [] + exceptions << e + end + + if exceptions + if exceptions.size == 1 + raise exceptions.first + else + raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first + end + end + + listeners + end + def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @@ -106,27 +137,24 @@ if procish.arity == 1 && procish.parameters.length == 1 subscriber_class = EventObject end end - wrap_all pattern, subscriber_class.new(pattern, listener) + subscriber_class.new(pattern, listener) end - def self.wrap_all(pattern, subscriber) - unless pattern - AllMessages.new(subscriber) - else - subscriber - end - end - class Matcher # :nodoc: attr_reader :pattern, :exclusions def self.wrap(pattern) - return pattern if String === pattern - new(pattern) + if String === pattern + pattern + elsif pattern.nil? + AllMessages.new + else + new(pattern) + end end def initialize(pattern) @pattern = pattern @exclusions = Set.new @@ -137,10 +165,20 @@ end def ===(name) pattern === name && !exclusions.include?(name) end + + class AllMessages + def ===(name) + true + end + + def unsubscribe!(*) + false + end + end end class Evented # :nodoc: attr_reader :pattern @@ -175,14 +213,10 @@ def subscribed_to?(name) pattern === name end - def matches?(name) - pattern && pattern === name - end - def unsubscribe!(name) pattern.unsubscribe!(name) end end @@ -190,16 +224,16 @@ def publish(name, *args) @delegate.call name, *args end def start(name, id, payload) - timestack = Thread.current[:_timestack] ||= [] + timestack = IsolatedExecutionState[:_timestack] ||= [] timestack.push Time.now end def finish(name, id, payload) - timestack = Thread.current[:_timestack] + timestack = IsolatedExecutionState[:_timestack] started = timestack.pop @delegate.call(name, started, Time.now, id, payload) end end @@ -207,31 +241,31 @@ def publish(name, *args) @delegate.call name, *args end def start(name, id, payload) - timestack = Thread.current[:_timestack_monotonic] ||= [] - timestack.push Concurrent.monotonic_time + timestack = IsolatedExecutionState[:_timestack_monotonic] ||= [] + timestack.push Process.clock_gettime(Process::CLOCK_MONOTONIC) end def finish(name, id, payload) - timestack = Thread.current[:_timestack_monotonic] + timestack = IsolatedExecutionState[:_timestack_monotonic] started = timestack.pop - @delegate.call(name, started, Concurrent.monotonic_time, id, payload) + @delegate.call(name, started, Process.clock_gettime(Process::CLOCK_MONOTONIC), id, payload) end end class EventObject < Evented def start(name, id, payload) - stack = Thread.current[:_event_stack] ||= [] + stack = IsolatedExecutionState[:_event_stack] ||= [] event = build_event name, id, payload event.start! stack.push event end def finish(name, id, payload) - stack = Thread.current[:_event_stack] + stack = IsolatedExecutionState[:_event_stack] event = stack.pop event.payload = payload event.finish! @delegate.call event end @@ -242,37 +276,9 @@ private def build_event(name, id, payload) ActiveSupport::Notifications::Event.new name, nil, nil, id, payload end - end - - class AllMessages # :nodoc: - def initialize(delegate) - @delegate = delegate - end - - def start(name, id, payload) - @delegate.start name, id, payload - end - - def finish(name, id, payload) - @delegate.finish name, id, payload - end - - def publish(name, *args) - @delegate.publish name, *args - end - - def subscribed_to?(name) - true - end - - def unsubscribe!(*) - false - end - - alias :matches? :=== end end end end end