lib/timber/log_devices/http.rb in timber-1.0.8 vs lib/timber/log_devices/http.rb in timber-1.0.9

- old
+ new

@@ -1,131 +1,230 @@ -require "timber/log_devices/http/triggered_buffer" - module Timber module LogDevices - # A log device that buffers and delivers log messages over HTTPS to the Timber API in batches. - # The buffer and delivery strategy are very efficient and the log messages will be delivered in - # msgpack format. + # A log device that buffers and delivers log messages over HTTPS to the Timber API. + # It uses batches, keep-alive connections, and messagepack to delivery logs with + # high-throughput and little overhead. # # See {#initialize} for options and more details. class HTTP - API_URI = URI.parse(ENV["TIMBER_INGESTION_URL"] || "https://logs.timber.io/frames") - CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze - CONNECTION_HEADER = "keep-alive".freeze - USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze - HTTPS = Net::HTTP.new(API_URI.host, API_URI.port).tap do |https| - https.use_ssl = true - https.read_timeout = 30 - https.ssl_timeout = 10 - # Ruby 1.9.X doesn't have this setting. - if https.respond_to?(:keep_alive_timeout=) - https.keep_alive_timeout = 60 + # @private + class LogMsgQueue + MAX_MSG_BYTES = 50_000 # 50kb + + def initialize(max_bytes) + @lock = Mutex.new + @max_bytes = max_bytes + @array = [] + @bytesize = 0 end - https.open_timeout = 10 + + def enqueue(msg) + if msg.bytesize > MAX_MSG_BYTES + raise ArgumentError.new("Log message exceeds the #{MAX_MSG_BYTES} bytes limit") + end + + @lock.synchronize do + @array << msg + @bytesize += msg.bytesize + end + end + + def flush + @lock.synchronize do + old = @array + @array = [] + @bytesize = 0 + return old + end + end + + def full? + @lock.synchronize do + @bytesize >= @max_bytes + end + end + + def size + @array.size + end end + + # Works like SizedQueue, but drops message instead of blocking. Pass one of these in + # to {HTTP#intiialize} via the :request_queue option if you'd prefer to drop messages + # in the event of a buffer overflow instead of applying back pressure. + class DroppingSizedQueue < SizedQueue + # Returns true/false depending on whether the queue is full or not + def push(obj) + @mutex.synchronize do + return false unless @que.length < @max + + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + return true + end + end + end + + TIMBER_URL = "https://logs.timber.io/frames".freeze + CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze + USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze DELIVERY_FREQUENCY_SECONDS = 2.freeze - RETRY_LIMIT = 3.freeze + RETRY_LIMIT = 5.freeze BACKOFF_RATE_SECONDS = 3.freeze # Instantiates a new HTTP log device that can be passed to {Timber::Logger#initialize}. # + # The class maintains a buffer which is flushed in batches to the Timber API. 2 + # options control when the flush happens, `:batch_byte_size` and `:flush_interval`. + # If either of these are surpassed, the buffer will be flushed. + # + # By default, the buffer will apply back pressure log messages are generated faster than + # the client can delivery them. But you can drop messages instead by passing a + # {DroppingSizedQueue} via the `:request_queue` option. + # # @param api_key [String] The API key provided to you after you add your application to # [Timber](https://timber.io). # @param [Hash] options the options to create a HTTP log device with. - # @option attributes [Symbol] :payload_limit_bytes Determines the maximum size in bytes that - # and HTTP payload can be. Please see {TriggereBuffer#initialize} for the default. - # @option attributes [Symbol] :buffer_limit_bytes Determines the maximum size of the total - # buffer. This should be many times larger than the `:payload_limit_bytes`. - # Please see {TriggereBuffer#initialize} for the default. - # @option attributes [Symbol] :buffer_overflow_handler (nil) When a single message exceeds - # `:payload_limit_bytes` or the entire buffer exceeds `:buffer_limit_bytes`, the Proc - # passed to this option will be called with the msg that would overflow the buffer. See - # the examples on how to use this properly. - # @option attributes [Symbol] :delivery_frequency_seconds (2) How often the client should - # attempt to deliver logs to the Timber API. The HTTP client buffers logs between calls. + # @option attributes [Symbol] :batch_byte_size Determines the maximum size in bytes for + # each HTTP payload. If the buffer exceeds this limit a delivery will be attempted. + # @option attributes [Symbol] :debug Whether to print debug output or not. This is also + # inferred from ENV['debug']. Output will be sent to `Timber::Config.logger`. + # @option attributes [Symbol] :flush_interval (2) How often the client should + # attempt to deliver logs to the Timber API. The HTTP client buffers logs and this + # options represents how often that will happen, assuming `:batch_byte_size` is not met. + # @option attributes [Symbol] :requests_per_conn The number of requests to send over a + # single persistent connection. After this number is met, the connection will be closed + # and a new one will be opened. + # @option attributes [Symbol] :request_queue The request queue object that queues Net::HTTP + # requests for delivery. By deafult this is a `SizedQueue` of size `3`. Meaning once + # 3 requests are placed on the queue for delivery, back pressure will be applied. IF + # you'd prefer to drop messages instead, pass a {DroppingSizedQueue}. See examples for + # an example. + # @option attributes [Symbol] :timber_url The Timber URL to delivery the log lines. The + # default is set via {TIMBER_URL}. # # @example Basic usage # Timber::Logger.new(Timber::LogDevices::HTTP.new("my_timber_api_key")) # - # @example Handling buffer overflows - # # Persist overflowed lines to a file - # # Note: You could write these to any permanent storage. - # overflow_log_path = "/path/to/my/overflow_log.log" - # overflow_handler = Proc.new { |log_line_msg| File.write(overflow_log_path, log_line_ms) } + # @example Dropping messages instead of applying back pressure # http_log_device = Timber::LogDevices::HTTP.new("my_timber_api_key", - # buffer_overflow_handler: overflow_handler) + # request_queue: Timber::LogDevices::HTTP::DroppingSizedQueue.new(3)) # Timber::Logger.new(http_log_device) def initialize(api_key, options = {}) @api_key = api_key - @buffer = TriggeredBuffer.new( - payload_limit_bytes: options[:payload_limit_bytes], - limit_bytes: options[:buffer_limit_bytes], - overflow_handler: options[:buffer_overflow_handler] - ) - @delivery_interval_thread = Thread.new do - loop do - sleep(options[:delivery_frequency_seconds] || DELIVERY_FREQUENCY_SECONDS) + @debug = options[:debug] || ENV['debug'] + @timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL) + @batch_byte_size = options[:batch_byte_size] || 3_000_000 # 3mb + @flush_interval = options[:flush_interval] || 2 # 2 seconds + @requests_per_conn = options[:requests_per_conn] || 1_000 + @msg_queue = LogMsgQueue.new(@batch_byte_size) + @request_queue = options[:request_queue] || SizedQueue.new(3) + @req_in_flight = 0 - @last_messages_overflow_count = 0 - messages_overflown_count = @buffer.messages_overflown_count - if messages_overflown_count >= @last_messages_overflow_count - difference = messages_overflown_count - @last_messages_overflow_count - @last_messages_overflow_count = messages_overflown_count - logger.warn("Timber HTTP buffer has overflown #{difference} times") - end - - buffer_for_delivery = @buffer.reserve - if buffer_for_delivery - deliver(buffer_for_delivery) - end - end + if options[:threads] != false + @outlet_thread = Thread.new { outlet } + @flush_thread = Thread.new { intervaled_flush } end end # Write a new log line message to the buffer, and deliver if the msg exceeds the # payload limit. def write(msg) - buffer_for_delivery = @buffer.write(msg) - if buffer_for_delivery - deliver(buffer_for_delivery) + @msg_queue.enqueue(msg) + if @msg_queue.full? + flush end true end # Closes the log device, cleans up, and attempts one last delivery. def close - @delivery_interval_thread.kill - buffer_for_delivery = @buffer.reserve - if buffer_for_delivery - deliver(buffer_for_delivery) - end + @flush_thread.kill if @flush_thread + @outlet_thread.kill if @outlet_thread + flush end private - def deliver(body) - Thread.new do - RETRY_LIMIT.times do |try_index| - request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req| - req['Authorization'] = authorization_payload - req['Connection'] = CONNECTION_HEADER - req['Content-Type'] = CONTENT_TYPE - req['User-Agent'] = USER_AGENT - req.body = body + def debug? + !@debug.nil? + end + + def flush + msgs = @msg_queue.flush + return if msgs.empty? + + body = "" + msgs.each do |msg| + body << msg + end + + req = Net::HTTP::Post.new(@timber_url.path) + req['Authorization'] = authorization_payload + req['Content-Type'] = CONTENT_TYPE + req['User-Agent'] = USER_AGENT + req.body = body + @request_queue.enq(req) + @last_flush = Time.now + end + + def intervaled_flush + # Wait specified time period before starting + sleep @flush_interval + loop do + begin + if intervaled_flush_ready? + flush end + sleep(0.1) + rescue Exception => e + logger.error("Timber intervaled flush failed: #{e.inspect}") + end + end + end - res = HTTPS.request(request) - code = res.code.to_i - if code < 200 || code >= 300 - try = try_index + 1 - logger.debug("Timber HTTP delivery failed, try #{try} - #{res.code}: #{res.body}") - sleep(try * BACKOFF_RATE_SECONDS) - else - @buffer.remove(body) - logger.debug("Timber HTTP delivery successful - #{code}") - logger.debug("Timber new buffer size - #{@buffer.total_bytesize}") - break # exit the loop + def intervaled_flush_ready? + @last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval + end + + def outlet + loop do + http = Net::HTTP.new(@timber_url.host, @timber_url.port) + http.set_debug_output(logger) if debug? + http.use_ssl = true if @timber_url.scheme == 'https' + http.read_timeout = 30 + http.ssl_timeout = 10 + http.open_timeout = 10 + + begin + http.start do |conn| + num_reqs = 0 + while num_reqs < @requests_per_conn + #Blocks waiting for a request. + req = @request_queue.deq + @req_in_flight += 1 + resp = nil + begin + resp = conn.request(req) + rescue => e + logger.error("Timber request error: #{e.message}") if debug? + next + ensure + @req_in_flight -= 1 + end + num_reqs += 1 + logger.info("Timber request successful: #{resp.code}") if debug? + end end + rescue => e + logger.error("Timber request error: #{e.message}") if debug? + ensure + http.finish if http.started? end end end def authorization_payload \ No newline at end of file