lib/active_support/notifications.rb in activesupport-3.0.pre vs lib/active_support/notifications.rb in activesupport-3.0.0.rc

- old
+ new

@@ -1,171 +1,78 @@ -require 'thread' require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' module ActiveSupport # Notifications provides an instrumentation API for Ruby. To instrument an # action in Ruby you just need to do: # # ActiveSupport::Notifications.instrument(:render, :extra => :information) do # render :text => "Foo" # end # # You can consume those events and the information they provide by registering - # a subscriber. For instance, let's store all instrumented events in an array: + # a log subscriber. For instance, let's store all instrumented events in an array: # # @events = [] # - # ActiveSupport::Notifications.subscribe do |event| - # @events << event + # ActiveSupport::Notifications.subscribe do |*args| + # @events << ActiveSupport::Notifications::Event.new(*args) # end # # ActiveSupport::Notifications.instrument(:render, :extra => :information) do # render :text => "Foo" # end # # event = @events.first - # event.class #=> ActiveSupport::Notifications::Event # event.name #=> :render - # event.duration #=> 10 (in miliseconds) - # event.result #=> "Foo" + # event.duration #=> 10 (in milliseconds) # event.payload #=> { :extra => :information } # # When subscribing to Notifications, you can pass a pattern, to only consume # events that match the pattern: # # ActiveSupport::Notifications.subscribe(/render/) do |event| # @render_events << event # end # # Notifications ships with a queue implementation that consumes and publish events - # to subscribers in a thread. You can use any queue implementation you want. + # to log subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + autoload :Instrumenter, 'active_support/notifications/instrumenter' + autoload :Event, 'active_support/notifications/instrumenter' + autoload :Fanout, 'active_support/notifications/fanout' + @instrumenters = Hash.new { |h,k| h[k] = notifier.listening?(k) } + class << self - delegate :instrument, :to => :instrumenter + attr_writer :notifier + delegate :publish, :to => :notifier - def instrumenter - Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) + def instrument(name, payload = {}) + if @instrumenters[name] + instrumenter.instrument(name, payload) { yield payload if block_given? } + else + yield payload if block_given? + end end - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) - end - end - - class Instrumenter - def initialize(publisher) - @publisher = publisher - end - - def instrument(name, payload={}) - payload[:time] = Time.now - payload[:thread_id] = Thread.current.object_id - payload[:result] = yield if block_given? - ensure - payload[:duration] = 1000 * (Time.now.to_f - payload[:time].to_f) - @publisher.publish(name, payload) - end - end - - class Publisher - def initialize(queue) - @queue = queue - end - - def publish(name, payload) - @queue.publish(name, payload) - end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - - def bind(pattern) - @pattern = pattern - self - end - - def subscribe - @queue.subscribe(@pattern) do |name, payload| - yield Event.new(name, payload) + def subscribe(*args, &block) + notifier.subscribe(*args, &block).tap do + @instrumenters.clear end end - end - class Event - attr_reader :name, :time, :duration, :thread_id, :result, :payload - - def initialize(name, payload) - @name = name - @payload = payload.dup - @time = @payload.delete(:time) - @thread_id = @payload.delete(:thread_id) - @result = @payload.delete(:result) - @duration = @payload.delete(:duration) + def unsubscribe(*args) + notifier.unsubscribe(*args) + @instrumenters.clear end - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + self.duration >= event.duration) + def notifier + @notifier ||= Fanout.new end - end - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners, @stream = [], Queue.new - @thread = Thread.new { consume } + def instrumenter + Thread.current[:"instrumentation_#{notifier.object_id}"] ||= Instrumenter.new(notifier) end - - def publish(*event) - @stream.push(event) - end - - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) - end - - def consume - while event = @stream.shift - @listeners.each { |l| l.publish(*event) } - end - end - - class Listener - attr_reader :thread - - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - @thread = Thread.new { consume } - end - - def publish(name, payload) - unless @pattern && !(@pattern === name.to_s) - @queue << [name, payload] - end - end - - def consume - while event = @queue.shift - @subscriber.call(*event) - end - end - end end end - - Notifications.queue = Notifications::LittleFanout.new end