lib/semantic_logger/appender/async.rb in semantic_logger-4.3.1 vs lib/semantic_logger/appender/async.rb in semantic_logger-4.4.0

- old
+ new

@@ -4,22 +4,23 @@ module Appender # Allow any appender to run asynchronously in a separate thread. class Async extend Forwardable - attr_accessor :logger, :lag_check_interval, :lag_threshold_s - attr_reader :queue, :appender + attr_accessor :lag_check_interval, :lag_threshold_s + attr_reader :queue, :appender, :max_queue_size # Forward methods that can be called directly def_delegator :@appender, :name def_delegator :@appender, :should_log? def_delegator :@appender, :filter def_delegator :@appender, :host def_delegator :@appender, :application def_delegator :@appender, :level def_delegator :@appender, :level= def_delegator :@appender, :logger + def_delegator :@appender, :logger= # Appender proxy to allow an existing appender to run asynchronously in a separate thread. # # Parameters: # max_queue_size: [Integer] @@ -41,21 +42,28 @@ @appender = appender @lag_check_interval = lag_check_interval @lag_threshold_s = lag_threshold_s @thread = nil - - if max_queue_size == -1 - @queue = Queue.new - @capped = false - else - @queue = SizedQueue.new(max_queue_size) - @capped = true - end + @max_queue_size = max_queue_size + create_queue thread end + # Re-open appender after a fork + def reopen + # Workaround CRuby crash on fork by recreating queue on reopen + # https://github.com/rocketjob/semantic_logger/issues/103 + @queue&.close + create_queue + + appender.reopen if appender.respond_to?(:reopen) + + @thread.kill if @thread&.alive? + @thread = Thread.new { process } + end + # Returns [true|false] if the queue has a capped size. def capped? @capped end @@ -89,10 +97,20 @@ submit_request(:close) end private + def create_queue + if max_queue_size == -1 + @queue = Queue.new + @capped = false + else + @queue = SizedQueue.new(max_queue_size) + @capped = true + end + end + # Separate thread for batching up log messages before writing. def process # This thread is designed to never go down unless the main thread terminates # or the appender is closed. Thread.current.name = logger.name @@ -138,9 +156,10 @@ end else break unless process_message(message) end end + logger.trace 'Async: Queue Closed' end # Returns false when message processing should be stopped def process_message(message) case message[:command]