require 'libhoney/response' module Libhoney # @api private class TransmissionClient def initialize(max_batch_size: 50, send_frequency: 100, max_concurrent_batches: 10, pending_work_capacity: 1000, responses: nil, block_on_send: false, block_on_responses: false, user_agent_addition: nil) @responses = responses || SizedQueue.new(pending_work_capacity * 2) @block_on_send = block_on_send @block_on_responses = block_on_responses @max_batch_size = max_batch_size @send_frequency = send_frequency @max_concurrent_batches = max_concurrent_batches @pending_work_capacity = pending_work_capacity @user_agent = build_user_agent(user_agent_addition).freeze # use a SizedQueue so the producer will block on adding to the send_queue when @block_on_send is true @send_queue = SizedQueue.new(@pending_work_capacity) @threads = [] @lock = Mutex.new end def add(event) raise ArgumentError, "No APIHost for Honeycomb. Can't send to the Great Unknown." if event.api_host == "" raise ArgumentError, "No WriteKey specified. Can't send event." if event.writekey == "" raise ArgumentError, "No Dataset for Honeycomb. Can't send datasetless." if event.dataset == "" begin @send_queue.enq(event, !@block_on_send) rescue ThreadError # happens if the queue was full and block_on_send = false. end ensure_threads_running end def send_loop http_clients = Hash.new do |h, api_host| h[api_host] = HTTP.persistent(api_host).headers( 'User-Agent' => @user_agent, 'Content-Type' => 'application/json', ) end # eat events until we run out loop { e = @send_queue.pop break if e == nil before = Time.now begin http = http_clients[e.api_host] resp = http.post('/1/events/'+URI.escape(e.dataset), json: e.data, headers: { 'X-Honeycomb-Team' => e.writekey, 'X-Honeycomb-SampleRate' => e.sample_rate, 'X-Event-Time' => e.timestamp.iso8601(3) }) # "You must consume response before sending next request via persistent connection" # https://github.com/httprb/http/wiki/Persistent-Connections-%28keep-alive%29#note-using-persistent-requests-correctly resp.flush response = Response.new(:status_code => resp.status) rescue Exception => error # catch a broader swath of exceptions than is usually good practice, # because this is effectively the top-level exception handler for the # sender threads, and we don't want those threads to die (leaving # nothing consuming the queue). response = Response.new(:error => error) ensure if response response.duration = Time.now - before response.metadata = e.metadata end end begin @responses.enq(response, !@block_on_responses) if response rescue ThreadError # happens if the queue was full and block_on_send = false. end } ensure http_clients.each do |_, http| http.close rescue nil end end def close(drain) # if drain is false, clear the remaining unprocessed events from the queue @send_queue.clear if drain == false # send @threads.length number of nils so each thread will fall out of send_loop @threads.length.times { @send_queue << nil } @threads.each do |t| t.join end @threads = [] @responses.enq(nil) 0 end private def build_user_agent(user_agent_addition) ua = "libhoney-rb/#{VERSION}" ua << " #{user_agent_addition}" if user_agent_addition ua end def ensure_threads_running @lock.synchronize { @threads.select!(&:alive?) while @threads.length < @max_concurrent_batches @threads << Thread.new { self.send_loop } end } end end end