lib/timber/log_devices/http.rb in timber-2.0.20 vs lib/timber/log_devices/http.rb in timber-2.0.21

- old
+ new

@@ -3,11 +3,12 @@ module Timber module LogDevices # A highly efficient log device that buffers and delivers log messages over HTTPS to # the Timber API. It uses batches, keep-alive connections, and msgpack to deliver logs with - # high-throughput and little overhead. + # high-throughput and little overhead. All log preparation and delivery is done asynchronously + # in a thread as not to block application execution. # # See {#initialize} for options and more details. class HTTP # @private class LogMsgQueue @@ -116,10 +117,11 @@ @flush_continuously = options[:flush_continuously] != false @flush_interval = options[:flush_interval] || 1 # 1 second @requests_per_conn = options[:requests_per_conn] || 2_500 @msg_queue = LogMsgQueue.new(@batch_size) @request_queue = options[:request_queue] || SizedQueue.new(3) + @successive_error_count = 0 @requests_in_flight = 0 end # Write a new log line message to the buffer, and deliver if the msg exceeds the # payload limit. @@ -131,11 +133,11 @@ # the current process is forked. This is the case with various web servers, # such as phusion passenger. ensure_flush_threads_are_started if @msg_queue.full? - debug_logger.debug("Flushing timber buffer via write") if debug_logger + debug_logger.debug("Flushing HTTP buffer via write") if debug_logger flush end true end @@ -152,13 +154,30 @@ @request_queue.enq(req) end # Closes the log device, cleans up, and attempts one last delivery. def close + # Kill the flush thread immediately since we are about to flush again. @flush_thread.kill if @flush_thread - @outlet_thread.kill if @outlet_thread + + # Flush all remaining messages flush + + # Kill the request_outlet thread gracefully. We do not want to kill it while a + # request is inflight. Ideally we'd let it finish before we die. + if @request_outlet_thread + 4.times do + if @requests_in_flight == 0 && @request_queue.size == 0 + @request_outlet_thread.kill + break + else + debug_logger.error("Busy delivering the final log messages, " + + "connection will close when complete.") + sleep 1 + end + end + end end private def debug_logger Timber::Config.instance.debug_logger @@ -168,12 +187,12 @@ # started. This is called lazily from #write so that we # only start the threads as needed, but it also ensures # threads are started after process forking. def ensure_flush_threads_are_started if @flush_continuously - if @outlet_thread.nil? || !@outlet_thread.alive? - @outlet_thread = Thread.new { outlet } + if @request_outlet_thread.nil? || !@request_outlet_thread.alive? + @request_outlet_thread = Thread.new { request_outlet } end if @flush_thread.nil? || !@flush_thread.alive? @flush_thread = Thread.new { intervaled_flush } end @@ -181,67 +200,92 @@ end def intervaled_flush # Wait specified time period before starting sleep @flush_interval + loop do begin if intervaled_flush_ready? - debug_logger.debug("Flushing timber buffer via the interval") if debug_logger + debug_logger.debug("Flushing HTTP buffer via the interval") if debug_logger flush end - sleep(0.1) + + sleep(0.5) rescue Exception => e - logger.error("Timber intervaled flush failed: #{e.inspect}\n\n#{e.backtrace}") + logger.error("Intervaled HTTP flush failed: #{e.inspect}\n\n#{e.backtrace}") end end end def intervaled_flush_ready? @last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval end - def outlet + def build_http + http = Net::HTTP.new(@timber_url.host, @timber_url.port) + http.set_debug_output(debug_logger) if debug_logger + http.use_ssl = true if @timber_url.scheme == 'https' + http.read_timeout = 30 + http.ssl_timeout = 10 + http.open_timeout = 10 + http + end + + def request_outlet loop do - http = Net::HTTP.new(@timber_url.host, @timber_url.port) - http.set_debug_output(debug_logger) if debug_logger - http.use_ssl = true if @timber_url.scheme == 'https' - http.read_timeout = 30 - http.ssl_timeout = 10 - http.open_timeout = 10 + http = build_http begin - debug_logger.info("Starting Timber HTTP connection") if debug_logger - http.start do |conn| - num_reqs = 0 - while num_reqs < @requests_per_conn - if debug_logger - debug_logger.debug("Waiting on next Timber request") - debug_logger.debug("Number of threads waiting on Timber request queue: #{@request_queue.num_waiting}") - end + debug_logger.info("Starting HTTP connection") if debug_logger - # Blocks waiting for a request. - req = @request_queue.deq - @requests_in_flight += 1 - resp = nil - begin - resp = conn.request(req) - rescue => e - debug_logger.error("Timber request error: #{e.message}") if debug_logger - next - ensure - @requests_in_flight -= 1 - end - num_reqs += 1 - debug_logger.debug("Timber request successful: #{resp.code}") if debug_logger - end + http.start do |conn| + deliver_requests(conn) end rescue => e - debug_logger.error("Timber request error: #{e.message}") if debug_logger + debug_logger.error("#request_outlet error: #{e.message}") if debug_logger ensure - debug_logger.debug("Finishing Timber HTTP connection") if debug_logger + debug_logger.info("Finishing HTTP connection") if debug_logger http.finish if http.started? end + end + end + + def deliver_requests(conn) + num_reqs = 0 + + while num_reqs < @requests_per_conn + debug_logger.info("Waiting on next request, threads waiting: #{@request_queue.num_waiting}") if debug_logger + + # Blocks waiting for a request. + req = @request_queue.deq + @requests_in_flight += 1 + + begin + resp = conn.request(req) + rescue => e + debug_logger.error("#deliver_request error: #{e.message}") if debug_logger + + @successive_error_count += 1 + + # Back off so that we don't hammer the Timber API. + calculated_backoff = @successive_error_count * 2 + backoff = calculated_backoff > 30 ? 30 : calculated_backoff + + debug_logger.error("Backing off #{backoff} seconds, error ##{@successive_error_count}") if debug_logger + + sleep backoff + + # Throw the request back on the queue for a retry + @request_queue.enq(req) + return false + ensure + @requests_in_flight -= 1 + end + + @successive_error_count = 0 + num_reqs += 1 + debug_logger.info("Request successful: #{resp.code}") if debug_logger end end def authorization_payload @authorization_payload ||= "Basic #{Base64.urlsafe_encode64(@api_key).chomp}" \ No newline at end of file