lib/elastic_apm/transport/connection/http.rb in elastic-apm-3.1.0 vs lib/elastic_apm/transport/connection/http.rb in elastic-apm-3.2.0

- old
+ new

@@ -1,37 +1,53 @@ # frozen_string_literal: true -require 'http' -require 'concurrent' -require 'zlib' - require 'elastic_apm/transport/connection/proxy_pipe' module ElasticAPM module Transport class Connection # @api private class Http include Logging - def initialize(config) + def initialize(config, headers: nil) @config = config - @closed = Concurrent::AtomicBoolean.new + @headers = headers || Headers.new(config) + @client = build_client + @closed = Concurrent::AtomicBoolean.new(true) + end + def open(url) + @closed.make_false @rd, @wr = ProxyPipe.pipe(compress: @config.http_compression?) + @request = open_request_in_thread(url) end - def open(url, headers: {}, ssl_context: nil) - @request = open_request_in_thread(url, headers, ssl_context) - end - - def self.open(config, url, headers: {}, ssl_context: nil) + def self.open(config, url) new(config).tap do |http| - http.open(url, headers: headers, ssl_context: ssl_context) + http.open(url) end end + def post(url, body: nil, headers: nil) + request(:post, url, body: body, headers: headers) + end + + def get(url, headers: nil) + request(:get, url, headers: headers) + end + + def request(method, url, body: nil, headers: nil) + @client.send( + method, + url, + body: body, + headers: (headers ? @headers.merge(headers) : @headers).to_h, + ssl_context: @config.ssl_context + ).flush + end + def write(str) @wr.write(str) @wr.bytes_sent end @@ -67,49 +83,41 @@ def thread_str format('[THREAD:%s]', Thread.current.object_id) end - # rubocop:disable Metrics/LineLength - def open_request_in_thread(url, headers, ssl_context) - client = build_client(headers) - + # rubocop:disable Metrics/MethodLength + def open_request_in_thread(url) debug '%s: Opening new request', thread_str Thread.new do begin - post(client, url, ssl_context) + resp = post(url, body: @rd, headers: @headers.chunked.to_h) + + if resp&.status == 202 + debug 'APM Server responded with status 202' + elsif resp + error "APM Server responded with an error:\n%p", resp.body.to_s + end rescue Exception => e - error "Couldn't establish connection to APM Server:\n%p", e.inspect + error( + "Couldn't establish connection to APM Server:\n%p", e.inspect + ) end end end - # rubocop:enable Metrics/LineLength + # rubocop:enable Metrics/MethodLength - def build_client(headers) - client = HTTP.headers(headers) + def build_client + client = HTTP.headers(@headers) return client unless @config.proxy_address && @config.proxy_port client.via( @config.proxy_address, @config.proxy_port, @config.proxy_username, @config.proxy_password, @config.proxy_headers ) - end - - def post(client, url, ssl_context) - resp = client.post( - url, - body: @rd, - ssl_context: ssl_context - ).flush - - if resp&.status == 202 - debug 'APM Server responded with status 202' - elsif resp - error "APM Server responded with an error:\n%p", resp.body.to_s - end end end end end end