lib/timber/log_devices/http.rb in timber-2.0.20 vs lib/timber/log_devices/http.rb in timber-2.0.21
- old
+ new
@@ -3,11 +3,12 @@
module Timber
module LogDevices
# A highly efficient log device that buffers and delivers log messages over HTTPS to
# the Timber API. It uses batches, keep-alive connections, and msgpack to deliver logs with
- # high-throughput and little overhead.
+ # high-throughput and little overhead. All log preparation and delivery is done asynchronously
+ # in a thread as not to block application execution.
#
# See {#initialize} for options and more details.
class HTTP
# @private
class LogMsgQueue
@@ -116,10 +117,11 @@
@flush_continuously = options[:flush_continuously] != false
@flush_interval = options[:flush_interval] || 1 # 1 second
@requests_per_conn = options[:requests_per_conn] || 2_500
@msg_queue = LogMsgQueue.new(@batch_size)
@request_queue = options[:request_queue] || SizedQueue.new(3)
+ @successive_error_count = 0
@requests_in_flight = 0
end
# Write a new log line message to the buffer, and deliver if the msg exceeds the
# payload limit.
@@ -131,11 +133,11 @@
# the current process is forked. This is the case with various web servers,
# such as phusion passenger.
ensure_flush_threads_are_started
if @msg_queue.full?
- debug_logger.debug("Flushing timber buffer via write") if debug_logger
+ debug_logger.debug("Flushing HTTP buffer via write") if debug_logger
flush
end
true
end
@@ -152,13 +154,30 @@
@request_queue.enq(req)
end
# Closes the log device, cleans up, and attempts one last delivery.
def close
+ # Kill the flush thread immediately since we are about to flush again.
@flush_thread.kill if @flush_thread
- @outlet_thread.kill if @outlet_thread
+
+ # Flush all remaining messages
flush
+
+ # Kill the request_outlet thread gracefully. We do not want to kill it while a
+ # request is inflight. Ideally we'd let it finish before we die.
+ if @request_outlet_thread
+ 4.times do
+ if @requests_in_flight == 0 && @request_queue.size == 0
+ @request_outlet_thread.kill
+ break
+ else
+ debug_logger.error("Busy delivering the final log messages, " +
+ "connection will close when complete.")
+ sleep 1
+ end
+ end
+ end
end
private
def debug_logger
Timber::Config.instance.debug_logger
@@ -168,12 +187,12 @@
# started. This is called lazily from #write so that we
# only start the threads as needed, but it also ensures
# threads are started after process forking.
def ensure_flush_threads_are_started
if @flush_continuously
- if @outlet_thread.nil? || !@outlet_thread.alive?
- @outlet_thread = Thread.new { outlet }
+ if @request_outlet_thread.nil? || !@request_outlet_thread.alive?
+ @request_outlet_thread = Thread.new { request_outlet }
end
if @flush_thread.nil? || !@flush_thread.alive?
@flush_thread = Thread.new { intervaled_flush }
end
@@ -181,67 +200,92 @@
end
def intervaled_flush
# Wait specified time period before starting
sleep @flush_interval
+
loop do
begin
if intervaled_flush_ready?
- debug_logger.debug("Flushing timber buffer via the interval") if debug_logger
+ debug_logger.debug("Flushing HTTP buffer via the interval") if debug_logger
flush
end
- sleep(0.1)
+
+ sleep(0.5)
rescue Exception => e
- logger.error("Timber intervaled flush failed: #{e.inspect}\n\n#{e.backtrace}")
+ logger.error("Intervaled HTTP flush failed: #{e.inspect}\n\n#{e.backtrace}")
end
end
end
def intervaled_flush_ready?
@last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval
end
- def outlet
+ def build_http
+ http = Net::HTTP.new(@timber_url.host, @timber_url.port)
+ http.set_debug_output(debug_logger) if debug_logger
+ http.use_ssl = true if @timber_url.scheme == 'https'
+ http.read_timeout = 30
+ http.ssl_timeout = 10
+ http.open_timeout = 10
+ http
+ end
+
+ def request_outlet
loop do
- http = Net::HTTP.new(@timber_url.host, @timber_url.port)
- http.set_debug_output(debug_logger) if debug_logger
- http.use_ssl = true if @timber_url.scheme == 'https'
- http.read_timeout = 30
- http.ssl_timeout = 10
- http.open_timeout = 10
+ http = build_http
begin
- debug_logger.info("Starting Timber HTTP connection") if debug_logger
- http.start do |conn|
- num_reqs = 0
- while num_reqs < @requests_per_conn
- if debug_logger
- debug_logger.debug("Waiting on next Timber request")
- debug_logger.debug("Number of threads waiting on Timber request queue: #{@request_queue.num_waiting}")
- end
+ debug_logger.info("Starting HTTP connection") if debug_logger
- # Blocks waiting for a request.
- req = @request_queue.deq
- @requests_in_flight += 1
- resp = nil
- begin
- resp = conn.request(req)
- rescue => e
- debug_logger.error("Timber request error: #{e.message}") if debug_logger
- next
- ensure
- @requests_in_flight -= 1
- end
- num_reqs += 1
- debug_logger.debug("Timber request successful: #{resp.code}") if debug_logger
- end
+ http.start do |conn|
+ deliver_requests(conn)
end
rescue => e
- debug_logger.error("Timber request error: #{e.message}") if debug_logger
+ debug_logger.error("#request_outlet error: #{e.message}") if debug_logger
ensure
- debug_logger.debug("Finishing Timber HTTP connection") if debug_logger
+ debug_logger.info("Finishing HTTP connection") if debug_logger
http.finish if http.started?
end
+ end
+ end
+
+ def deliver_requests(conn)
+ num_reqs = 0
+
+ while num_reqs < @requests_per_conn
+ debug_logger.info("Waiting on next request, threads waiting: #{@request_queue.num_waiting}") if debug_logger
+
+ # Blocks waiting for a request.
+ req = @request_queue.deq
+ @requests_in_flight += 1
+
+ begin
+ resp = conn.request(req)
+ rescue => e
+ debug_logger.error("#deliver_request error: #{e.message}") if debug_logger
+
+ @successive_error_count += 1
+
+ # Back off so that we don't hammer the Timber API.
+ calculated_backoff = @successive_error_count * 2
+ backoff = calculated_backoff > 30 ? 30 : calculated_backoff
+
+ debug_logger.error("Backing off #{backoff} seconds, error ##{@successive_error_count}") if debug_logger
+
+ sleep backoff
+
+ # Throw the request back on the queue for a retry
+ @request_queue.enq(req)
+ return false
+ ensure
+ @requests_in_flight -= 1
+ end
+
+ @successive_error_count = 0
+ num_reqs += 1
+ debug_logger.info("Request successful: #{resp.code}") if debug_logger
end
end
def authorization_payload
@authorization_payload ||= "Basic #{Base64.urlsafe_encode64(@api_key).chomp}"
\ No newline at end of file