lib/elastic_apm/transport/connection/http.rb in elastic-apm-3.1.0 vs lib/elastic_apm/transport/connection/http.rb in elastic-apm-3.2.0
- old
+ new
@@ -1,37 +1,53 @@
# frozen_string_literal: true
-require 'http'
-require 'concurrent'
-require 'zlib'
-
require 'elastic_apm/transport/connection/proxy_pipe'
module ElasticAPM
module Transport
class Connection
# @api private
class Http
include Logging
- def initialize(config)
+ def initialize(config, headers: nil)
@config = config
- @closed = Concurrent::AtomicBoolean.new
+ @headers = headers || Headers.new(config)
+ @client = build_client
+ @closed = Concurrent::AtomicBoolean.new(true)
+ end
+ def open(url)
+ @closed.make_false
@rd, @wr = ProxyPipe.pipe(compress: @config.http_compression?)
+ @request = open_request_in_thread(url)
end
- def open(url, headers: {}, ssl_context: nil)
- @request = open_request_in_thread(url, headers, ssl_context)
- end
-
- def self.open(config, url, headers: {}, ssl_context: nil)
+ def self.open(config, url)
new(config).tap do |http|
- http.open(url, headers: headers, ssl_context: ssl_context)
+ http.open(url)
end
end
+ def post(url, body: nil, headers: nil)
+ request(:post, url, body: body, headers: headers)
+ end
+
+ def get(url, headers: nil)
+ request(:get, url, headers: headers)
+ end
+
+ def request(method, url, body: nil, headers: nil)
+ @client.send(
+ method,
+ url,
+ body: body,
+ headers: (headers ? @headers.merge(headers) : @headers).to_h,
+ ssl_context: @config.ssl_context
+ ).flush
+ end
+
def write(str)
@wr.write(str)
@wr.bytes_sent
end
@@ -67,49 +83,41 @@
def thread_str
format('[THREAD:%s]', Thread.current.object_id)
end
- # rubocop:disable Metrics/LineLength
- def open_request_in_thread(url, headers, ssl_context)
- client = build_client(headers)
-
+ # rubocop:disable Metrics/MethodLength
+ def open_request_in_thread(url)
debug '%s: Opening new request', thread_str
Thread.new do
begin
- post(client, url, ssl_context)
+ resp = post(url, body: @rd, headers: @headers.chunked.to_h)
+
+ if resp&.status == 202
+ debug 'APM Server responded with status 202'
+ elsif resp
+ error "APM Server responded with an error:\n%p", resp.body.to_s
+ end
rescue Exception => e
- error "Couldn't establish connection to APM Server:\n%p", e.inspect
+ error(
+ "Couldn't establish connection to APM Server:\n%p", e.inspect
+ )
end
end
end
- # rubocop:enable Metrics/LineLength
+ # rubocop:enable Metrics/MethodLength
- def build_client(headers)
- client = HTTP.headers(headers)
+ def build_client
+ client = HTTP.headers(@headers)
return client unless @config.proxy_address && @config.proxy_port
client.via(
@config.proxy_address,
@config.proxy_port,
@config.proxy_username,
@config.proxy_password,
@config.proxy_headers
)
- end
-
- def post(client, url, ssl_context)
- resp = client.post(
- url,
- body: @rd,
- ssl_context: ssl_context
- ).flush
-
- if resp&.status == 202
- debug 'APM Server responded with status 202'
- elsif resp
- error "APM Server responded with an error:\n%p", resp.body.to_s
- end
end
end
end
end
end