lib/timber/log_devices/http.rb in timber-1.0.8 vs lib/timber/log_devices/http.rb in timber-1.0.9
- old
+ new
@@ -1,131 +1,230 @@
-require "timber/log_devices/http/triggered_buffer"
-
module Timber
module LogDevices
- # A log device that buffers and delivers log messages over HTTPS to the Timber API in batches.
- # The buffer and delivery strategy are very efficient and the log messages will be delivered in
- # msgpack format.
+ # A log device that buffers and delivers log messages over HTTPS to the Timber API.
+ # It uses batches, keep-alive connections, and messagepack to delivery logs with
+ # high-throughput and little overhead.
#
# See {#initialize} for options and more details.
class HTTP
- API_URI = URI.parse(ENV["TIMBER_INGESTION_URL"] || "https://logs.timber.io/frames")
- CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze
- CONNECTION_HEADER = "keep-alive".freeze
- USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze
- HTTPS = Net::HTTP.new(API_URI.host, API_URI.port).tap do |https|
- https.use_ssl = true
- https.read_timeout = 30
- https.ssl_timeout = 10
- # Ruby 1.9.X doesn't have this setting.
- if https.respond_to?(:keep_alive_timeout=)
- https.keep_alive_timeout = 60
+ # @private
+ class LogMsgQueue
+ MAX_MSG_BYTES = 50_000 # 50kb
+
+ def initialize(max_bytes)
+ @lock = Mutex.new
+ @max_bytes = max_bytes
+ @array = []
+ @bytesize = 0
end
- https.open_timeout = 10
+
+ def enqueue(msg)
+ if msg.bytesize > MAX_MSG_BYTES
+ raise ArgumentError.new("Log message exceeds the #{MAX_MSG_BYTES} bytes limit")
+ end
+
+ @lock.synchronize do
+ @array << msg
+ @bytesize += msg.bytesize
+ end
+ end
+
+ def flush
+ @lock.synchronize do
+ old = @array
+ @array = []
+ @bytesize = 0
+ return old
+ end
+ end
+
+ def full?
+ @lock.synchronize do
+ @bytesize >= @max_bytes
+ end
+ end
+
+ def size
+ @array.size
+ end
end
+
+ # Works like SizedQueue, but drops message instead of blocking. Pass one of these in
+ # to {HTTP#intiialize} via the :request_queue option if you'd prefer to drop messages
+ # in the event of a buffer overflow instead of applying back pressure.
+ class DroppingSizedQueue < SizedQueue
+ # Returns true/false depending on whether the queue is full or not
+ def push(obj)
+ @mutex.synchronize do
+ return false unless @que.length < @max
+
+ @que.push obj
+ begin
+ t = @waiting.shift
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ end
+ return true
+ end
+ end
+ end
+
+ TIMBER_URL = "https://logs.timber.io/frames".freeze
+ CONTENT_TYPE = "application/x-timber-msgpack-frame-1".freeze
+ USER_AGENT = "Timber Ruby Gem/#{Timber::VERSION}".freeze
DELIVERY_FREQUENCY_SECONDS = 2.freeze
- RETRY_LIMIT = 3.freeze
+ RETRY_LIMIT = 5.freeze
BACKOFF_RATE_SECONDS = 3.freeze
# Instantiates a new HTTP log device that can be passed to {Timber::Logger#initialize}.
#
+ # The class maintains a buffer which is flushed in batches to the Timber API. 2
+ # options control when the flush happens, `:batch_byte_size` and `:flush_interval`.
+ # If either of these are surpassed, the buffer will be flushed.
+ #
+ # By default, the buffer will apply back pressure log messages are generated faster than
+ # the client can delivery them. But you can drop messages instead by passing a
+ # {DroppingSizedQueue} via the `:request_queue` option.
+ #
# @param api_key [String] The API key provided to you after you add your application to
# [Timber](https://timber.io).
# @param [Hash] options the options to create a HTTP log device with.
- # @option attributes [Symbol] :payload_limit_bytes Determines the maximum size in bytes that
- # and HTTP payload can be. Please see {TriggereBuffer#initialize} for the default.
- # @option attributes [Symbol] :buffer_limit_bytes Determines the maximum size of the total
- # buffer. This should be many times larger than the `:payload_limit_bytes`.
- # Please see {TriggereBuffer#initialize} for the default.
- # @option attributes [Symbol] :buffer_overflow_handler (nil) When a single message exceeds
- # `:payload_limit_bytes` or the entire buffer exceeds `:buffer_limit_bytes`, the Proc
- # passed to this option will be called with the msg that would overflow the buffer. See
- # the examples on how to use this properly.
- # @option attributes [Symbol] :delivery_frequency_seconds (2) How often the client should
- # attempt to deliver logs to the Timber API. The HTTP client buffers logs between calls.
+ # @option attributes [Symbol] :batch_byte_size Determines the maximum size in bytes for
+ # each HTTP payload. If the buffer exceeds this limit a delivery will be attempted.
+ # @option attributes [Symbol] :debug Whether to print debug output or not. This is also
+ # inferred from ENV['debug']. Output will be sent to `Timber::Config.logger`.
+ # @option attributes [Symbol] :flush_interval (2) How often the client should
+ # attempt to deliver logs to the Timber API. The HTTP client buffers logs and this
+ # options represents how often that will happen, assuming `:batch_byte_size` is not met.
+ # @option attributes [Symbol] :requests_per_conn The number of requests to send over a
+ # single persistent connection. After this number is met, the connection will be closed
+ # and a new one will be opened.
+ # @option attributes [Symbol] :request_queue The request queue object that queues Net::HTTP
+ # requests for delivery. By deafult this is a `SizedQueue` of size `3`. Meaning once
+ # 3 requests are placed on the queue for delivery, back pressure will be applied. IF
+ # you'd prefer to drop messages instead, pass a {DroppingSizedQueue}. See examples for
+ # an example.
+ # @option attributes [Symbol] :timber_url The Timber URL to delivery the log lines. The
+ # default is set via {TIMBER_URL}.
#
# @example Basic usage
# Timber::Logger.new(Timber::LogDevices::HTTP.new("my_timber_api_key"))
#
- # @example Handling buffer overflows
- # # Persist overflowed lines to a file
- # # Note: You could write these to any permanent storage.
- # overflow_log_path = "/path/to/my/overflow_log.log"
- # overflow_handler = Proc.new { |log_line_msg| File.write(overflow_log_path, log_line_ms) }
+ # @example Dropping messages instead of applying back pressure
# http_log_device = Timber::LogDevices::HTTP.new("my_timber_api_key",
- # buffer_overflow_handler: overflow_handler)
+ # request_queue: Timber::LogDevices::HTTP::DroppingSizedQueue.new(3))
# Timber::Logger.new(http_log_device)
def initialize(api_key, options = {})
@api_key = api_key
- @buffer = TriggeredBuffer.new(
- payload_limit_bytes: options[:payload_limit_bytes],
- limit_bytes: options[:buffer_limit_bytes],
- overflow_handler: options[:buffer_overflow_handler]
- )
- @delivery_interval_thread = Thread.new do
- loop do
- sleep(options[:delivery_frequency_seconds] || DELIVERY_FREQUENCY_SECONDS)
+ @debug = options[:debug] || ENV['debug']
+ @timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL)
+ @batch_byte_size = options[:batch_byte_size] || 3_000_000 # 3mb
+ @flush_interval = options[:flush_interval] || 2 # 2 seconds
+ @requests_per_conn = options[:requests_per_conn] || 1_000
+ @msg_queue = LogMsgQueue.new(@batch_byte_size)
+ @request_queue = options[:request_queue] || SizedQueue.new(3)
+ @req_in_flight = 0
- @last_messages_overflow_count = 0
- messages_overflown_count = @buffer.messages_overflown_count
- if messages_overflown_count >= @last_messages_overflow_count
- difference = messages_overflown_count - @last_messages_overflow_count
- @last_messages_overflow_count = messages_overflown_count
- logger.warn("Timber HTTP buffer has overflown #{difference} times")
- end
-
- buffer_for_delivery = @buffer.reserve
- if buffer_for_delivery
- deliver(buffer_for_delivery)
- end
- end
+ if options[:threads] != false
+ @outlet_thread = Thread.new { outlet }
+ @flush_thread = Thread.new { intervaled_flush }
end
end
# Write a new log line message to the buffer, and deliver if the msg exceeds the
# payload limit.
def write(msg)
- buffer_for_delivery = @buffer.write(msg)
- if buffer_for_delivery
- deliver(buffer_for_delivery)
+ @msg_queue.enqueue(msg)
+ if @msg_queue.full?
+ flush
end
true
end
# Closes the log device, cleans up, and attempts one last delivery.
def close
- @delivery_interval_thread.kill
- buffer_for_delivery = @buffer.reserve
- if buffer_for_delivery
- deliver(buffer_for_delivery)
- end
+ @flush_thread.kill if @flush_thread
+ @outlet_thread.kill if @outlet_thread
+ flush
end
private
- def deliver(body)
- Thread.new do
- RETRY_LIMIT.times do |try_index|
- request = Net::HTTP::Post.new(API_URI.request_uri).tap do |req|
- req['Authorization'] = authorization_payload
- req['Connection'] = CONNECTION_HEADER
- req['Content-Type'] = CONTENT_TYPE
- req['User-Agent'] = USER_AGENT
- req.body = body
+ def debug?
+ !@debug.nil?
+ end
+
+ def flush
+ msgs = @msg_queue.flush
+ return if msgs.empty?
+
+ body = ""
+ msgs.each do |msg|
+ body << msg
+ end
+
+ req = Net::HTTP::Post.new(@timber_url.path)
+ req['Authorization'] = authorization_payload
+ req['Content-Type'] = CONTENT_TYPE
+ req['User-Agent'] = USER_AGENT
+ req.body = body
+ @request_queue.enq(req)
+ @last_flush = Time.now
+ end
+
+ def intervaled_flush
+ # Wait specified time period before starting
+ sleep @flush_interval
+ loop do
+ begin
+ if intervaled_flush_ready?
+ flush
end
+ sleep(0.1)
+ rescue Exception => e
+ logger.error("Timber intervaled flush failed: #{e.inspect}")
+ end
+ end
+ end
- res = HTTPS.request(request)
- code = res.code.to_i
- if code < 200 || code >= 300
- try = try_index + 1
- logger.debug("Timber HTTP delivery failed, try #{try} - #{res.code}: #{res.body}")
- sleep(try * BACKOFF_RATE_SECONDS)
- else
- @buffer.remove(body)
- logger.debug("Timber HTTP delivery successful - #{code}")
- logger.debug("Timber new buffer size - #{@buffer.total_bytesize}")
- break # exit the loop
+ def intervaled_flush_ready?
+ @last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval
+ end
+
+ def outlet
+ loop do
+ http = Net::HTTP.new(@timber_url.host, @timber_url.port)
+ http.set_debug_output(logger) if debug?
+ http.use_ssl = true if @timber_url.scheme == 'https'
+ http.read_timeout = 30
+ http.ssl_timeout = 10
+ http.open_timeout = 10
+
+ begin
+ http.start do |conn|
+ num_reqs = 0
+ while num_reqs < @requests_per_conn
+ #Blocks waiting for a request.
+ req = @request_queue.deq
+ @req_in_flight += 1
+ resp = nil
+ begin
+ resp = conn.request(req)
+ rescue => e
+ logger.error("Timber request error: #{e.message}") if debug?
+ next
+ ensure
+ @req_in_flight -= 1
+ end
+ num_reqs += 1
+ logger.info("Timber request successful: #{resp.code}") if debug?
+ end
end
+ rescue => e
+ logger.error("Timber request error: #{e.message}") if debug?
+ ensure
+ http.finish if http.started?
end
end
end
def authorization_payload
\ No newline at end of file