lib/cabin/channel.rb in cabin-0.4.3 vs lib/cabin/channel.rb in cabin-0.4.4
- old
+ new
@@ -4,11 +4,12 @@
require "cabin/namespace"
require "cabin/context"
require "cabin/outputs/stdlib-logger"
require "cabin/outputs/io"
require "cabin/metrics"
-require "logger"
+require "logger" # stdlib
+require "thread"
# A wonderful channel for logging.
#
# You can log normal messages through here, but you should be really
# shipping structured data. A message is just part of your data.
@@ -44,22 +45,36 @@
#
# I, [2011-10-11T01:00:57.993200 #1209] INFO -- : {:timestamp=>"2011-10-11T01:00:57.992353-0700", :foo=>"Hello", :example=>100, :message=>"Fizzle", :level=>:info}
# I, [2011-10-11T01:00:57.993575 #1209] INFO -- : {:timestamp=>"2011-10-11T01:00:57.993517-0700", :message=>"Done in foo", :level=>:info}
#
class Cabin::Channel
+ @channel_lock = Mutex.new
+ @channels = Hash.new { |h,k| h[k] = Cabin::Channel.new }
+
class << self
# Get a channel for a given identifier. If this identifier has never been
# used, a new channel is created for it.
# The default identifier is the application executable name.
#
# This is useful for using the same Cabin::Channel across your
# entire application.
def get(identifier=$0)
- @channels ||= Hash.new { |h,k| h[k] = Cabin::Channel.new }
- return @channels[identifier]
+ return @channel_lock.synchronize { @channels[identifier] }
end # def Cabin::Channel.get
+ def set(identifier, channel)
+ return @channel_lock.synchronize { @channels[identifier] = channel }
+ end # def Cabin::Channel.set
+
+ def each(&block)
+ @channel_lock.synchronize do
+ @channels.each do |identifier, channel|
+ yield identifier, channel
+ end
+ end
+ end # def Cabin::Channel.each
+
# Get a list of filters included in this class.
def filters
@filters ||= []
end # def Cabin::Channel.filters
@@ -69,43 +84,54 @@
@filters ||= []
@filters << block
end
end # class << self
- include Cabin::Mixins::Logger
include Cabin::Mixins::Timestamp
+ include Cabin::Mixins::Logger
include Cabin::Mixins::Timer
# All channels come with a metrics provider.
attr_accessor :metrics
private
# Create a new logging channel.
# The default log level is 'info'
def initialize
- @outputs = []
+ @subscribers = {}
@data = {}
@level = :info
@metrics = Cabin::Metrics.new
@metrics.channel = self
+ @subscriber_lock = Mutex.new
end # def initialize
# Subscribe a new input
# New events will be sent to the subscriber using the '<<' method
# foo << event
+ #
+ # Returns a subscription id you can use later to unsubscribe
def subscribe(output)
# Wrap ruby stdlib Logger if given.
if output.is_a?(::Logger)
output = Cabin::Outputs::StdlibLogger.new(output)
elsif output.is_a?(::IO)
output = Cabin::Outputs::IO.new(output)
end
- @outputs << output
- # TODO(sissel): Return a method or object that allows you to easily
- # unsubscribe?
+ @subscriber_lock.synchronize do
+ @subscribers[output.object_id] = output
+ end
+ return output.object_id
end # def subscribe
+
+ # Unsubscribe. Takes a 'subscription id' as returned by the subscribe method
+ def unsubscribe(id)
+ @subscriber_lock.synchronize do
+ @subscribers.delete(id)
+ end
+ end # def unsubscribe
# Set some contextual map value
def []=(key, value)
@data[key] = value
end # def []=
@@ -127,25 +153,26 @@
#
# A special key :timestamp is set at the time of this method call. The value
# is a string ISO8601 timestamp with microsecond precision.
def publish(data)
event = {}
- event.merge!(@data) # Merge any logger context
+ self.class.filters.each do |filter|
+ filter.call(event)
+ end
if data.is_a?(String)
event[:message] = data
else
event.merge!(data)
end
+ event.merge!(@data) # Merge any logger context
- self.class.filters.each do |filter|
- filter.call(event)
+ @subscriber_lock.synchronize do
+ @subscribers.each do |id, output|
+ output << event
+ end
end
-
- @outputs.each do |out|
- out << event
- end
end # def publish
def context
ctx = Cabin::Context.new(self)
return ctx
@@ -156,7 +183,7 @@
data = { :message => data }
end
return data
end # def dataify
- public(:initialize, :context, :subscribe, :[]=, :[], :remove, :publish, :time, :context)
+ public(:initialize, :context, :subscribe, :unsubscribe, :[]=, :[], :remove, :publish, :time, :context)
end # class Cabin::Channel