# frozen_string_literal: true require "mutex_m" require "concurrent/map" require "set" require "active_support/core_ext/object/try" module ActiveSupport module Notifications class InstrumentationSubscriberError < RuntimeError attr_reader :exceptions def initialize(exceptions) @exceptions = exceptions exception_class_names = exceptions.map { |e| e.class.name } super "Exception(s) occurred within instrumentation subscribers: #{exception_class_names.join(', ')}" end end module FanoutIteration # :nodoc: def iterate_guarding_exceptions(listeners) exceptions = nil listeners.each do |s| yield s rescue Exception => e exceptions ||= [] exceptions << e end if exceptions if exceptions.size == 1 raise exceptions.first else raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first end end listeners end end # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. # # This class is thread safe. All methods are reentrant. class Fanout include Mutex_m def initialize @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } } @other_subscribers = [] @all_listeners_for = Concurrent::Map.new @groups_for = Concurrent::Map.new @silenceable_groups_for = Concurrent::Map.new super end def inspect # :nodoc: total_patterns = @string_subscribers.size + @other_subscribers.size "#<#{self.class} (#{total_patterns} patterns)>" end def subscribe(pattern = nil, callable = nil, monotonic: false, &block) subscriber = Subscribers.new(pattern, callable || block, monotonic) synchronize do case pattern when String @string_subscribers[pattern] << subscriber clear_cache(pattern) when NilClass, Regexp @other_subscribers << subscriber clear_cache else raise ArgumentError, "pattern must be specified as a String, Regexp or empty" end end subscriber end def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @string_subscribers[subscriber_or_name].clear clear_cache(subscriber_or_name) @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) } else pattern = subscriber_or_name.try(:pattern) if String === pattern @string_subscribers[pattern].delete(subscriber_or_name) clear_cache(pattern) else @other_subscribers.delete(subscriber_or_name) clear_cache end end end end def clear_cache(key = nil) # :nodoc: if key @all_listeners_for.delete(key) @groups_for.delete(key) @silenceable_groups_for.delete(key) else @all_listeners_for.clear @groups_for.clear @silenceable_groups_for.clear end end class BaseGroup # :nodoc: include FanoutIteration def initialize(listeners, name, id, payload) @listeners = listeners end def each(&block) iterate_guarding_exceptions(@listeners, &block) end end class BaseTimeGroup < BaseGroup # :nodoc: def start(name, id, payload) @start_time = now end def finish(name, id, payload) stop_time = now each do |listener| listener.call(name, @start_time, stop_time, id, payload) end end end class MonotonicTimedGroup < BaseTimeGroup # :nodoc: private def now Process.clock_gettime(Process::CLOCK_MONOTONIC) end end class TimedGroup < BaseTimeGroup # :nodoc: private def now Time.now end end class EventedGroup < BaseGroup # :nodoc: def start(name, id, payload) each do |s| s.start(name, id, payload) end end def finish(name, id, payload) each do |s| s.finish(name, id, payload) end end end class EventObjectGroup < BaseGroup # :nodoc: def start(name, id, payload) @event = build_event(name, id, payload) @event.start! end def finish(name, id, payload) @event.payload = payload @event.finish! each do |s| s.call(@event) end end private def build_event(name, id, payload) ActiveSupport::Notifications::Event.new name, nil, nil, id, payload end end def groups_for(name) # :nodoc: groups = @groups_for.compute_if_absent(name) do all_listeners_for(name).reject(&:silenceable).group_by(&:group_class).transform_values do |s| s.map(&:delegate) end end silenceable_groups = @silenceable_groups_for.compute_if_absent(name) do all_listeners_for(name).select(&:silenceable).group_by(&:group_class).transform_values do |s| s.map(&:delegate) end end unless silenceable_groups.empty? groups = groups.dup silenceable_groups.each do |group_class, subscriptions| active_subscriptions = subscriptions.reject { |s| s.silenced?(name) } unless active_subscriptions.empty? groups[group_class] = (groups[group_class] || []) + active_subscriptions end end end groups end # A +Handle+ is used to record the start and finish time of event. # # Both #start and #finish must each be called exactly once. # # Where possible, it's best to use the block form: ActiveSupport::Notifications.instrument. # +Handle+ is a low-level API intended for cases where the block form can't be used. # # handle = ActiveSupport::Notifications.instrumenter.build_handle("my.event", {}) # begin # handle.start # # work to be instrumented # ensure # handle.finish # end class Handle def initialize(notifier, name, id, payload) # :nodoc: @name = name @id = id @payload = payload @groups = notifier.groups_for(name).map do |group_klass, grouped_listeners| group_klass.new(grouped_listeners, name, id, payload) end @state = :initialized end def start ensure_state! :initialized @state = :started @groups.each do |group| group.start(@name, @id, @payload) end end def finish finish_with_values(@name, @id, @payload) end def finish_with_values(name, id, payload) # :nodoc: ensure_state! :started @state = :finished @groups.each do |group| group.finish(name, id, payload) end end private def ensure_state!(expected) if @state != expected raise ArgumentError, "expected state to be #{expected.inspect} but was #{@state.inspect}" end end end include FanoutIteration def build_handle(name, id, payload) Handle.new(self, name, id, payload) end def start(name, id, payload) handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= []) handle = build_handle(name, id, payload) handle_stack << handle handle.start end def finish(name, id, payload, listeners = nil) handle_stack = IsolatedExecutionState[:_fanout_handle_stack] handle = handle_stack.pop handle.finish_with_values(name, id, payload) end def publish(name, *args) iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) } end def publish_event(event) iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) } end def all_listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @all_listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @all_listeners_for[name] ||= @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) } end end def listeners_for(name) all_listeners_for(name).reject { |s| s.silenced?(name) } end def listening?(name) all_listeners_for(name).any? { |s| !s.silenced?(name) } end # This is a sync queue, so there is no waiting. def wait end module Subscribers # :nodoc: def self.new(pattern, listener, monotonic) subscriber_class = monotonic ? MonotonicTimed : Timed if listener.respond_to?(:start) && listener.respond_to?(:finish) subscriber_class = Evented else # Doing this to detect a single argument block or callable # like `proc { |x| }` vs `proc { |*x| }`, `proc { |**x| }`, # or `proc { |x, **y| }` procish = listener.respond_to?(:parameters) ? listener : listener.method(:call) if procish.arity == 1 && procish.parameters.length == 1 subscriber_class = EventObject end end subscriber_class.new(pattern, listener) end class Matcher # :nodoc: attr_reader :pattern, :exclusions def self.wrap(pattern) if String === pattern pattern elsif pattern.nil? AllMessages.new else new(pattern) end end def initialize(pattern) @pattern = pattern @exclusions = Set.new end def unsubscribe!(name) exclusions << -name if pattern === name end def ===(name) pattern === name && !exclusions.include?(name) end class AllMessages def ===(name) true end def unsubscribe!(*) false end end end class Evented # :nodoc: attr_reader :pattern, :delegate, :silenceable def initialize(pattern, delegate) @pattern = Matcher.wrap(pattern) @delegate = delegate @silenceable = delegate.respond_to?(:silenced?) @can_publish = delegate.respond_to?(:publish) @can_publish_event = delegate.respond_to?(:publish_event) end def group_class EventedGroup end def publish(name, *args) if @can_publish @delegate.publish name, *args end end def publish_event(event) if @can_publish_event @delegate.publish_event event else publish(event.name, event.time, event.end, event.transaction_id, event.payload) end end def silenced?(name) @silenceable && @delegate.silenced?(name) end def subscribed_to?(name) pattern === name end def unsubscribe!(name) pattern.unsubscribe!(name) end end class Timed < Evented # :nodoc: def group_class TimedGroup end def publish(name, *args) @delegate.call name, *args end end class MonotonicTimed < Timed # :nodoc: def group_class MonotonicTimedGroup end end class EventObject < Evented def group_class EventObjectGroup end def publish_event(event) @delegate.call event end end end end end end