lib/cabin/channel.rb in cabin-0.4.3 vs lib/cabin/channel.rb in cabin-0.4.4

- old
+ new

@@ -4,11 +4,12 @@ require "cabin/namespace" require "cabin/context" require "cabin/outputs/stdlib-logger" require "cabin/outputs/io" require "cabin/metrics" -require "logger" +require "logger" # stdlib +require "thread" # A wonderful channel for logging. # # You can log normal messages through here, but you should be really # shipping structured data. A message is just part of your data. @@ -44,22 +45,36 @@ # # I, [2011-10-11T01:00:57.993200 #1209] INFO -- : {:timestamp=>"2011-10-11T01:00:57.992353-0700", :foo=>"Hello", :example=>100, :message=>"Fizzle", :level=>:info} # I, [2011-10-11T01:00:57.993575 #1209] INFO -- : {:timestamp=>"2011-10-11T01:00:57.993517-0700", :message=>"Done in foo", :level=>:info} # class Cabin::Channel + @channel_lock = Mutex.new + @channels = Hash.new { |h,k| h[k] = Cabin::Channel.new } + class << self # Get a channel for a given identifier. If this identifier has never been # used, a new channel is created for it. # The default identifier is the application executable name. # # This is useful for using the same Cabin::Channel across your # entire application. def get(identifier=$0) - @channels ||= Hash.new { |h,k| h[k] = Cabin::Channel.new } - return @channels[identifier] + return @channel_lock.synchronize { @channels[identifier] } end # def Cabin::Channel.get + def set(identifier, channel) + return @channel_lock.synchronize { @channels[identifier] = channel } + end # def Cabin::Channel.set + + def each(&block) + @channel_lock.synchronize do + @channels.each do |identifier, channel| + yield identifier, channel + end + end + end # def Cabin::Channel.each + # Get a list of filters included in this class. def filters @filters ||= [] end # def Cabin::Channel.filters @@ -69,43 +84,54 @@ @filters ||= [] @filters << block end end # class << self - include Cabin::Mixins::Logger include Cabin::Mixins::Timestamp + include Cabin::Mixins::Logger include Cabin::Mixins::Timer # All channels come with a metrics provider. attr_accessor :metrics private # Create a new logging channel. # The default log level is 'info' def initialize - @outputs = [] + @subscribers = {} @data = {} @level = :info @metrics = Cabin::Metrics.new @metrics.channel = self + @subscriber_lock = Mutex.new end # def initialize # Subscribe a new input # New events will be sent to the subscriber using the '<<' method # foo << event + # + # Returns a subscription id you can use later to unsubscribe def subscribe(output) # Wrap ruby stdlib Logger if given. if output.is_a?(::Logger) output = Cabin::Outputs::StdlibLogger.new(output) elsif output.is_a?(::IO) output = Cabin::Outputs::IO.new(output) end - @outputs << output - # TODO(sissel): Return a method or object that allows you to easily - # unsubscribe? + @subscriber_lock.synchronize do + @subscribers[output.object_id] = output + end + return output.object_id end # def subscribe + + # Unsubscribe. Takes a 'subscription id' as returned by the subscribe method + def unsubscribe(id) + @subscriber_lock.synchronize do + @subscribers.delete(id) + end + end # def unsubscribe # Set some contextual map value def []=(key, value) @data[key] = value end # def []= @@ -127,25 +153,26 @@ # # A special key :timestamp is set at the time of this method call. The value # is a string ISO8601 timestamp with microsecond precision. def publish(data) event = {} - event.merge!(@data) # Merge any logger context + self.class.filters.each do |filter| + filter.call(event) + end if data.is_a?(String) event[:message] = data else event.merge!(data) end + event.merge!(@data) # Merge any logger context - self.class.filters.each do |filter| - filter.call(event) + @subscriber_lock.synchronize do + @subscribers.each do |id, output| + output << event + end end - - @outputs.each do |out| - out << event - end end # def publish def context ctx = Cabin::Context.new(self) return ctx @@ -156,7 +183,7 @@ data = { :message => data } end return data end # def dataify - public(:initialize, :context, :subscribe, :[]=, :[], :remove, :publish, :time, :context) + public(:initialize, :context, :subscribe, :unsubscribe, :[]=, :[], :remove, :publish, :time, :context) end # class Cabin::Channel