lib/timber/log_devices/http.rb in timber-2.0.19 vs lib/timber/log_devices/http.rb in timber-2.0.20
- old
+ new
@@ -1,5 +1,8 @@
+require "base64"
+require "net/https"
+
module Timber
module LogDevices
# A highly efficient log device that buffers and delivers log messages over HTTPS to
# the Timber API. It uses batches, keep-alive connections, and msgpack to deliver logs with
# high-throughput and little overhead.
@@ -78,10 +81,14 @@
# @param [Hash] options the options to create a HTTP log device with.
# @option attributes [Symbol] :batch_size (1000) Determines the maximum of log lines in
# each HTTP payload. If the queue exceeds this limit an HTTP request will be issued. Bigger
# payloads mean higher throughput, but also use more memory. Timber will not accept
# payloads larger than 1mb.
+ # @option attributes [Symbol] :flush_continuously (true) This should only be disabled under
+ # special circumstsances (like test suites). Setting this to `false` disables the
+ # continuous flushing of log message. As a result, flushing must be handled externally
+ # via the #flush method.
# @option attributes [Symbol] :flush_interval (1) How often the client should
# attempt to deliver logs to the Timber API in fractional seconds. The HTTP client buffers
# logs and this options represents how often that will happen, assuming `:batch_byte_size`
# is not met.
# @option attributes [Symbol] :requests_per_conn (2500) The number of requests to send over a
@@ -104,33 +111,49 @@
# Timber::Logger.new(http_log_device)
def initialize(api_key, options = {})
@api_key = api_key || raise(ArgumentError.new("The api_key parameter cannot be blank"))
@timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL)
@batch_size = options[:batch_size] || 1_000
+ @flush_continuously = options[:flush_continuously] != false
@flush_interval = options[:flush_interval] || 1 # 1 second
@requests_per_conn = options[:requests_per_conn] || 2_500
@msg_queue = LogMsgQueue.new(@batch_size)
@request_queue = options[:request_queue] || SizedQueue.new(3)
@requests_in_flight = 0
-
- if options[:threads] != false
- @outlet_thread = Thread.new { outlet }
- @flush_thread = Thread.new { intervaled_flush }
- end
end
# Write a new log line message to the buffer, and deliver if the msg exceeds the
# payload limit.
def write(msg)
@msg_queue.enqueue(msg)
+
+ # Lazily start flush threads to ensure threads are alive after forking processes.
+ # If the threads are started during instantiation they will not be copied when
+ # the current process is forked. This is the case with various web servers,
+ # such as phusion passenger.
+ ensure_flush_threads_are_started
+
if @msg_queue.full?
debug_logger.debug("Flushing timber buffer via write") if debug_logger
flush
end
true
end
+ def flush
+ @last_flush = Time.now
+ msgs = @msg_queue.flush
+ return if msgs.empty?
+
+ req = Net::HTTP::Post.new(@timber_url.path)
+ req['Authorization'] = authorization_payload
+ req['Content-Type'] = CONTENT_TYPE
+ req['User-Agent'] = USER_AGENT
+ req.body = msgs.to_msgpack
+ @request_queue.enq(req)
+ end
+
# Closes the log device, cleans up, and attempts one last delivery.
def close
@flush_thread.kill if @flush_thread
@outlet_thread.kill if @outlet_thread
flush
@@ -139,20 +162,23 @@
private
def debug_logger
Timber::Config.instance.debug_logger
end
- def flush
- @last_flush = Time.now
- msgs = @msg_queue.flush
- return if msgs.empty?
+ # This is a convenience method to ensure the flush thread are
+ # started. This is called lazily from #write so that we
+ # only start the threads as needed, but it also ensures
+ # threads are started after process forking.
+ def ensure_flush_threads_are_started
+ if @flush_continuously
+ if @outlet_thread.nil? || !@outlet_thread.alive?
+ @outlet_thread = Thread.new { outlet }
+ end
- req = Net::HTTP::Post.new(@timber_url.path)
- req['Authorization'] = authorization_payload
- req['Content-Type'] = CONTENT_TYPE
- req['User-Agent'] = USER_AGENT
- req.body = msgs.to_msgpack
- @request_queue.enq(req)
+ if @flush_thread.nil? || !@flush_thread.alive?
+ @flush_thread = Thread.new { intervaled_flush }
+ end
+ end
end
def intervaled_flush
# Wait specified time period before starting
sleep @flush_interval
\ No newline at end of file