require 'json' require 'timeout' require 'libhoney/response' module Libhoney # @api private class TransmissionClient def initialize(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, send_timeout: 10, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil) @responses = responses || SizedQueue.new(pending_work_capacity * 2) @block_on_send = block_on_send @block_on_responses = block_on_responses @max_batch_size = max_batch_size # convert to seconds @send_frequency = send_frequency.fdiv(1000) @max_concurrent_batches = max_concurrent_batches @pending_work_capacity = pending_work_capacity @send_timeout = send_timeout @user_agent = build_user_agent(user_agent_addition).freeze @send_queue = Queue.new @threads = [] @lock = Mutex.new # use a SizedQueue so the producer will block on adding to the batch_queue when @block_on_send is true @batch_queue = SizedQueue.new(@pending_work_capacity) @batch_thread = nil end def add(event) raise ArgumentError, "No APIHost for Honeycomb. Can't send to the Great Unknown." if event.api_host == '' raise ArgumentError, "No WriteKey specified. Can't send event." if event.writekey == '' raise ArgumentError, "No Dataset for Honeycomb. Can't send datasetless." if event.dataset == '' begin @batch_queue.enq(event, !@block_on_send) rescue ThreadError # happens if the queue was full and block_on_send = false. end ensure_threads_running end def send_loop http_clients = build_http_clients # eat events until we run out loop do api_host, writekey, dataset, batch = @send_queue.pop break if batch.nil? before = Time.now begin http = http_clients[api_host] body = serialize_batch(batch) next if body.nil? headers = { 'Content-Type' => 'application/json', 'X-Honeycomb-Team' => writekey } response = http.post( "/1/batch/#{Addressable::URI.escape(dataset)}", body: body, headers: headers ) process_response(response, before, batch) rescue Exception => e # catch a broader swath of exceptions than is usually good practice, # because this is effectively the top-level exception handler for the # sender threads, and we don't want those threads to die (leaving # nothing consuming the queue). begin batch.each do |event| # nil events in the batch should already have had an error # response enqueued in #serialize_batch next if event.nil? Response.new(error: e).tap do |error_response| error_response.metadata = event.metadata enqueue_response(error_response) end end rescue ThreadError end end end ensure http_clients.each do |_, http| begin http.close rescue StandardError nil end end end def close(drain) # if drain is false, clear the remaining unprocessed events from the queue unless drain @batch_queue.clear @send_queue.clear end @batch_queue.enq(nil) @batch_thread.join # send @threads.length number of nils so each thread will fall out of send_loop @threads.length.times { @send_queue << nil } @threads.each(&:join) @threads = [] enqueue_response(nil) 0 end def batch_loop next_send_time = Time.now + @send_frequency batched_events = Hash.new do |h, key| h[key] = [] end loop do begin while (event = Timeout.timeout(@send_frequency) { @batch_queue.pop }) key = [event.api_host, event.writekey, event.dataset] batched_events[key] << event end break rescue Exception ensure next_send_time = flush_batched_events(batched_events) if Time.now > next_send_time end end flush_batched_events(batched_events) end private ## # Enqueues a response to the responses queue suppressing ThreadError when # there is no space left on the queue and we are not blocking on response # def enqueue_response(response) @responses.enq(response, !@block_on_responses) rescue ThreadError end def process_response(http_response, before, batch) index = 0 http_response.parse.each do |event| index += 1 while batch[index].nil? && index < batch.size break unless (batched_event = batch[index]) Response.new(status_code: event['status']).tap do |response| response.duration = Time.now - before response.metadata = batched_event.metadata enqueue_response(response) end end end def serialize_batch(batch) payload = [] batch.map! do |event| begin e = { time: event.timestamp.iso8601(3), samplerate: event.sample_rate, data: event.data } payload << JSON.generate(e) event rescue StandardError => e Response.new(error: e).tap do |response| response.metadata = event.metadata enqueue_response(response) end nil end end return if payload.empty? "[#{payload.join(',')}]" end def build_user_agent(user_agent_addition) ua = "libhoney-rb/#{VERSION}" ua << " #{user_agent_addition}" if user_agent_addition ua end def ensure_threads_running @lock.synchronize do @batch_thread = Thread.new { batch_loop } unless @batch_thread && @batch_thread.alive? @threads.select!(&:alive?) @threads << Thread.new { send_loop } while @threads.length < @max_concurrent_batches end end def flush_batched_events(batched_events) batched_events.each do |(api_host, writekey, dataset), events| events.each_slice(@max_batch_size) do |batch| @send_queue << [api_host, writekey, dataset, batch] end end batched_events.clear Time.now + @send_frequency end def build_http_clients Hash.new do |h, api_host| h[api_host] = HTTP.timeout(connect: @send_timeout, write: @send_timeout, read: @send_timeout) .persistent(api_host) .headers( 'User-Agent' => @user_agent, 'Content-Type' => 'application/json' ) end end end end