lib/timber/log_devices/http.rb in timber-2.0.19 vs lib/timber/log_devices/http.rb in timber-2.0.20

- old
+ new

@@ -1,5 +1,8 @@ +require "base64" +require "net/https" + module Timber module LogDevices # A highly efficient log device that buffers and delivers log messages over HTTPS to # the Timber API. It uses batches, keep-alive connections, and msgpack to deliver logs with # high-throughput and little overhead. @@ -78,10 +81,14 @@ # @param [Hash] options the options to create a HTTP log device with. # @option attributes [Symbol] :batch_size (1000) Determines the maximum of log lines in # each HTTP payload. If the queue exceeds this limit an HTTP request will be issued. Bigger # payloads mean higher throughput, but also use more memory. Timber will not accept # payloads larger than 1mb. + # @option attributes [Symbol] :flush_continuously (true) This should only be disabled under + # special circumstsances (like test suites). Setting this to `false` disables the + # continuous flushing of log message. As a result, flushing must be handled externally + # via the #flush method. # @option attributes [Symbol] :flush_interval (1) How often the client should # attempt to deliver logs to the Timber API in fractional seconds. The HTTP client buffers # logs and this options represents how often that will happen, assuming `:batch_byte_size` # is not met. # @option attributes [Symbol] :requests_per_conn (2500) The number of requests to send over a @@ -104,33 +111,49 @@ # Timber::Logger.new(http_log_device) def initialize(api_key, options = {}) @api_key = api_key || raise(ArgumentError.new("The api_key parameter cannot be blank")) @timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL) @batch_size = options[:batch_size] || 1_000 + @flush_continuously = options[:flush_continuously] != false @flush_interval = options[:flush_interval] || 1 # 1 second @requests_per_conn = options[:requests_per_conn] || 2_500 @msg_queue = LogMsgQueue.new(@batch_size) @request_queue = options[:request_queue] || SizedQueue.new(3) @requests_in_flight = 0 - - if options[:threads] != false - @outlet_thread = Thread.new { outlet } - @flush_thread = Thread.new { intervaled_flush } - end end # Write a new log line message to the buffer, and deliver if the msg exceeds the # payload limit. def write(msg) @msg_queue.enqueue(msg) + + # Lazily start flush threads to ensure threads are alive after forking processes. + # If the threads are started during instantiation they will not be copied when + # the current process is forked. This is the case with various web servers, + # such as phusion passenger. + ensure_flush_threads_are_started + if @msg_queue.full? debug_logger.debug("Flushing timber buffer via write") if debug_logger flush end true end + def flush + @last_flush = Time.now + msgs = @msg_queue.flush + return if msgs.empty? + + req = Net::HTTP::Post.new(@timber_url.path) + req['Authorization'] = authorization_payload + req['Content-Type'] = CONTENT_TYPE + req['User-Agent'] = USER_AGENT + req.body = msgs.to_msgpack + @request_queue.enq(req) + end + # Closes the log device, cleans up, and attempts one last delivery. def close @flush_thread.kill if @flush_thread @outlet_thread.kill if @outlet_thread flush @@ -139,20 +162,23 @@ private def debug_logger Timber::Config.instance.debug_logger end - def flush - @last_flush = Time.now - msgs = @msg_queue.flush - return if msgs.empty? + # This is a convenience method to ensure the flush thread are + # started. This is called lazily from #write so that we + # only start the threads as needed, but it also ensures + # threads are started after process forking. + def ensure_flush_threads_are_started + if @flush_continuously + if @outlet_thread.nil? || !@outlet_thread.alive? + @outlet_thread = Thread.new { outlet } + end - req = Net::HTTP::Post.new(@timber_url.path) - req['Authorization'] = authorization_payload - req['Content-Type'] = CONTENT_TYPE - req['User-Agent'] = USER_AGENT - req.body = msgs.to_msgpack - @request_queue.enq(req) + if @flush_thread.nil? || !@flush_thread.alive? + @flush_thread = Thread.new { intervaled_flush } + end + end end def intervaled_flush # Wait specified time period before starting sleep @flush_interval \ No newline at end of file