lib/semantic_logger/processor.rb in semantic_logger-4.1.1 vs lib/semantic_logger/processor.rb in semantic_logger-4.2.0

- old
+ new

@@ -1,129 +1,71 @@ module SemanticLogger # Thread that submits and processes log requests class Processor + # Returns [Appender::Async => SemanticLogger::Processor] the global instance of this processor + # wrapped in the Async proxy so that all logging is asynchronous in a thread of its own. + # + # More than one instance can be created if needed. + def self.instance + @processor + end + # Start the appender thread def self.start - return false if active? - @thread = Thread.new { process_requests } - raise 'Failed to start Appender Thread' unless @thread + return false if instance.active? + instance.thread true end # Returns true if the appender_thread is active def self.active? - @thread && @thread.alive? + instance.alive? end # Returns [Integer] the number of log entries waiting to be written to the appenders. # # When this number grows it is because the logging appender thread is not # able to write to the appenders fast enough. Either reduce the amount of # logging, increase the log level, reduce the number of appenders, or # look into speeding up the appenders themselves def self.queue_size - queue.size + instance.queue.size end - # Flush all queued log entries disk, database, etc. - # All queued log messages are written and then each appender is flushed in turn. - def self.flush - submit_request(:flush) - end - - # Close all appenders and flush any outstanding messages. - def self.close - submit_request(:close) - end - # Add log request to the queue for processing. + # Log subscribers are called inline before handing off to the queue. def self.<<(log) - return unless active? - - call_log_subscribers(log) - queue << log + instance.appender.send(:call_log_subscribers, log) + instance.log(log) end - # Submit command and wait for reply - def self.submit_request(command) - return false unless active? - - msg = "Too many queued log messages: #{queue_size}, running command: #{command}" - if queue_size > 1_000 - logger.warn msg - elsif queue_size > 100 - logger.info msg - elsif queue_size > 0 - logger.trace msg - end - - reply_queue = Queue.new - queue << {command: command, reply_queue: reply_queue} - reply_queue.pop - end - - # Allow the internal logger to be overridden from its default of STDERR - # Can be replaced with another Ruby logger or Rails logger, but never to - # SemanticLogger::Logger itself since it is for reporting problems - # while trying to log to the various appenders - def self.logger=(logger) - @logger = logger - end - # Returns the check_interval which is the number of messages between checks - # to determine if the appender thread is falling behind + # to determine if the appender thread is falling behind. def self.lag_check_interval - @lag_check_interval + instance.lag_check_interval end # Set the check_interval which is the number of messages between checks - # to determine if the appender thread is falling behind + # to determine if the appender thread is falling behind. def self.lag_check_interval=(lag_check_interval) - @lag_check_interval = lag_check_interval + instance.lag_check_interval = lag_check_interval end # Returns the amount of time in seconds - # to determine if the appender thread is falling behind + # to determine if the appender thread is falling behind. def self.lag_threshold_s - @lag_threshold_s + instance.lag_threshold_s end - def self.on_metric(options = {}, &block) - # Backward compatibility - options = options.is_a?(Hash) ? options.dup : {appender: options} - subscriber = block || options.delete(:appender) - - # Convert symbolized metrics appender to an actual object - subscriber = SemanticLogger::Appender.constantize_symbol(subscriber, 'SemanticLogger::Metrics').new(options) if subscriber.is_a?(Symbol) - - raise('When supplying a metrics subscriber, it must support the #call method') unless subscriber.is_a?(Proc) || subscriber.respond_to?(:call) - subscribers = (@metric_subscribers ||= Concurrent::Array.new) - subscribers << subscriber unless subscribers.include?(subscriber) + # Allow the internal logger to be overridden from its default of STDERR + # Can be replaced with another Ruby logger or Rails logger, but never to + # SemanticLogger::Logger itself since it is for reporting problems + # while trying to log to the various appenders + def self.logger=(logger) + @logger = logger end - def self.on_log(object = nil, &block) - subscriber = block || object - - raise('When supplying an on_log subscriber, it must support the #call method') unless subscriber.is_a?(Proc) || subscriber.respond_to?(:call) - subscribers = (@log_subscribers ||= Concurrent::Array.new) - subscribers << subscriber unless subscribers.include?(subscriber) - end - - private - - @thread = nil - @queue = Queue.new - @lag_check_interval = 5000 - @lag_threshold_s = 30 - @metric_subscribers = nil - @log_subscribers = nil - - # Queue to hold messages that need to be logged to the various appenders - def self.queue - @queue - end - # Internal logger for SemanticLogger # For example when an appender is not working etc.. # By default logs to STDERR def self.logger @logger ||= begin @@ -131,107 +73,37 @@ l.name = name l end end - # Separate appender thread responsible for reading log messages and - # calling the appenders in it's thread - def self.process_requests - # This thread is designed to never go down unless the main thread terminates - # Before terminating at_exit is used to flush all the appenders - # - # Should any appender fail to log or flush, the exception is logged and - # other appenders will still be called - Thread.current.name = 'SemanticLogger::Processor' - logger.trace "V#{VERSION} Processor thread active" - begin - count = 0 - while message = queue.pop - if message.is_a?(Log) - call_appenders(message) - call_metric_subscribers(message) if message.metric - count += 1 - # Check every few log messages whether this appender thread is falling behind - if count > lag_check_interval - if (diff = Time.now - message.time) > lag_threshold_s - logger.warn "Appender thread has fallen behind by #{diff} seconds with #{queue_size} messages queued up. Consider reducing the log level or changing the appenders" - end - count = 0 - end - else - case message[:command] - when :flush - flush_appenders - message[:reply_queue] << true if message[:reply_queue] - when :close - close_appenders - message[:reply_queue] << true if message[:reply_queue] - break - else - logger.warn "Appender thread: Ignoring unknown command: #{message[:command]}" - end - end - end - rescue Exception => exception - # This block may be called after the file handles have been released by Ruby - begin - logger.error 'Appender thread restarting due to exception', exception - rescue Exception - nil - end - retry - ensure - @thread = nil - # This block may be called after the file handles have been released by Ruby - begin - logger.trace 'Appender thread has stopped' - rescue Exception - nil - end - end - end + attr_accessor :logger, :log_subscribers - # Call Metric subscribers - def self.call_metric_subscribers(log) - # If no subscribers registered, then return immediately - return unless @metric_subscribers - - @metric_subscribers.each do |subscriber| - begin - subscriber.call(log) - rescue Exception => exc - logger.error 'Exception calling metrics subscriber', exc - end - end + def initialize + @log_subscribers = nil + @logger = self.class.logger.dup + @logger.name = self.class.name end - # Call on_log subscribers - def self.call_log_subscribers(log) - # If no subscribers registered, then return immediately - return unless @log_subscribers + def on_log(object = nil, &block) + subscriber = block || object - @log_subscribers.each do |subscriber| - begin - subscriber.call(log) - rescue Exception => exc - logger.error 'Exception calling :on_log subscriber', exc - end - end + raise('When supplying an on_log subscriber, it must support the #call method') unless subscriber.is_a?(Proc) || subscriber.respond_to?(:call) + subscribers = (@log_subscribers ||= Concurrent::Array.new) + subscribers << subscriber unless subscribers.include?(subscriber) end - # Call Appenders - def self.call_appenders(log) + def log(log) SemanticLogger.appenders.each do |appender| begin - appender.log(log) + appender.log(log) if appender.should_log?(log) rescue Exception => exc logger.error "Appender thread: Failed to log to appender: #{appender.inspect}", exc end end end - def self.flush_appenders + def flush SemanticLogger.appenders.each do |appender| begin logger.trace "Appender thread: Flushing appender: #{appender.name}" appender.flush rescue Exception => exc @@ -239,11 +111,11 @@ end end logger.trace 'Appender thread: All appenders flushed' end - def self.close_appenders + def close SemanticLogger.appenders.each do |appender| begin logger.trace "Appender thread: Closing appender: #{appender.name}" appender.flush appender.close @@ -251,9 +123,35 @@ rescue Exception => exc logger.error "Appender thread: Failed to close appender: #{appender.inspect}", exc end end logger.trace 'Appender thread: All appenders closed and removed from appender list' + end + + private + + def self.create_instance + SemanticLogger::Appender::Async.new( + name: 'SemanticLogger::Processor', + appender: new, + max_queue_size: -1 + ) + end + + @processor = create_instance + + # Call on_log subscribers + def call_log_subscribers(log) + # If no subscribers registered, then return immediately + return unless log_subscribers + + log_subscribers.each do |subscriber| + begin + subscriber.call(log) + rescue Exception => exc + logger.error 'Exception calling :on_log subscriber', exc + end + end end end end