module OneApm module Collector class CollectorService module HttpConnection # These include Errno connection errors, and all indicate that the # underlying TCP connection may be in a bad state. OA_CONNECTION_ERRORS = [Timeout::Error, EOFError, SystemCallError, SocketError].freeze def invoke_remote(method, payload = [], options = {}) start_ts = Time.now data = nil begin data = @marshaller.dump(payload, options) rescue StandardError, SystemStackError => e handle_serialization_error(method, e) end serialize_finish_ts = Time.now data, encoding = compress_request(data) size = data.size uri = remote_method_uri(method, @marshaller.format) full_uri = "#{@collector}#{uri}" @audit_logger.log_request(full_uri, payload, @marshaller) response = send_request(:data => data, :uri => uri, :encoding => encoding, :collector => @collector) @marshaller.load(decompress_response(response)) ensure record_timing_supportability_metrics(method, start_ts, serialize_finish_ts) if size record_size_supportability_metrics(method, size, options[:item_count]) end end # The path on the server that we should post our data to def remote_method_uri(method, format) params = {'run_id' => @agent_id, 'marshal_format' => format} uri = "/tpm/agent.do?PROTOCOL_VERSION=#{OA_PROTOCOL_VERSION}&license_key=#{@license_key}&method=#{method}" uri << '&' + params.map do |k,v| next unless v "#{k}=#{v}" end.compact.join('&') uri end # Posts to the specified server # # Options: # - :uri => the path to request on the server (a misnomer of # course) # - :encoding => the encoding to pass to the server # - :collector => a URI object that responds to the 'name' method # and returns the name of the collector to # contact # - :data => the data to send as the body of the request def send_request(opts) request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name) request['user-agent'] = user_agent request.body = opts[:data] request.content_type = "multipart/form-data; boundary=-oneapm-#{request.object_id}-" response = nil attempts = 0 max_attempts = 2 begin attempts += 1 conn = http_connection OneApm::Manager.logger.debug "Sending request to #{opts[:collector]}#{opts[:uri]}" OneApm::TimerLib.timeout(@request_timeout) do response = conn.request(request) end rescue *OA_CONNECTION_ERRORS => e close_shared_connection if attempts < max_attempts OneApm::Manager.logger.debug("Retrying request to #{opts[:collector]}#{opts[:uri]} after #{e}") retry else raise ServerConnectionException, "Recoverable error talking to #{@collector} after #{attempts} attempts: #{e}" end end log_and_return_response response end def session(&block) raise ArgumentError, "#{self.class}#shared_connection must be passed a block" unless block_given? begin t0 = Time.now @in_session = true if OneApm::Manager.config[:aggressive_keepalive] session_with_keepalive(&block) else session_without_keepalive(&block) end rescue *OA_CONNECTION_ERRORS => e elapsed = Time.now - t0 raise OneApm::ServerConnectionException, "Recoverable error connecting to #{@collector} after #{elapsed} seconds: #{e}" ensure @in_session = false end end def session_with_keepalive(&block) establish_shared_connection block.call end def session_without_keepalive(&block) begin establish_shared_connection block.call ensure close_shared_connection end end def http_connection if @in_session establish_shared_connection else create_http_connection end end def establish_shared_connection unless @shared_tcp_connection @shared_tcp_connection = create_and_start_http_connection end @shared_tcp_connection end def close_shared_connection if @shared_tcp_connection OneApm::Manager.logger.debug("Closing shared TCP connection to #{@shared_tcp_connection.address}:#{@shared_tcp_connection.port}") @shared_tcp_connection.finish if @shared_tcp_connection.started? @shared_tcp_connection = nil end end def setup_connection_timeouts(conn) # We use Timeout explicitly instead of this conn.read_timeout = nil if conn.respond_to?(:keep_alive_timeout) && OneApm::Manager.config[:aggressive_keepalive] conn.keep_alive_timeout = OneApm::Manager.config[:keep_alive_timeout] end end def create_and_start_http_connection conn = create_http_connection start_connection(conn) conn end def start_connection(conn) OneApm::Manager.logger.debug("Opening TCP connection to #{conn.address}:#{conn.port}") OneApm::TimerLib.timeout(@request_timeout) { conn.start } conn end # Return the Net::HTTP with proxy configuration given the OneApm::Support::Server object. def create_http_connection # Proxy returns regular HTTP if @proxy_host is nil (the default) http_class = Net::HTTP::Proxy(proxy_server.name, proxy_server.port, proxy_server.user, proxy_server.password) conn = http_class.new((@collector.ip || @collector.name), @collector.port) setup_connection_for_ssl(conn) if OneApm::Manager.config[:ssl] setup_connection_timeouts(conn) OneApm::Manager.logger.debug("Created net/http handle to #{conn.address}:#{conn.port}") conn end def setup_connection_for_ssl(conn) # Jruby 1.6.8 requires a gem for full ssl support and will throw # an error when use_ssl=(true) is called and jruby-openssl isn't # installed conn.use_ssl = true conn.verify_mode = OpenSSL::SSL::VERIFY_PEER conn.cert_store = ssl_cert_store rescue StandardError, LoadError msg = "Agent is configured to use SSL, but SSL is not available in the environment. " msg << "Either disable SSL in the agent configuration, or install SSL support." raise UnrecoverableAgentException.new(msg) end def ssl_cert_store path = cert_file_path if !@ssl_cert_store || path != @cached_cert_store_path OneApm::Manager.logger.debug("Creating SSL certificate store from file at #{path}") @ssl_cert_store = OpenSSL::X509::Store.new @ssl_cert_store.add_file(path) @cached_cert_store_path = path end @ssl_cert_store end # The path to the certificate file used to verify the SSL # connection if verify_peer is enabled def cert_file_path if path_override = OneApm::Manager.config[:ca_bundle_path] OneApm::Manager.logger.warn("Couldn't find CA bundle from configured ca_bundle_path: #{path_override}") unless File.exists? path_override path_override else File.expand_path(File.join(probe.oneapm_root, 'config', 'cert', 'cacert.pem')) end end # Compress request data def compress_request(data) encoding = 'deflate' data = OneApm::Support::Encoders::Compressed.encode(data) check_post_size(data) [data, encoding] end # Decompresses the response from the server, if it is gzip # encoded, otherwise returns it verbatim def decompress_response(response) if response['content-encoding'] == 'gzip' Zlib::GzipReader.new(StringIO.new(response.body)).read else response.body end end # Raises an UnrecoverableServerException if the post_string is longer # than the limit configured in the probe object def check_post_size(post_string) return if post_string.size < OneApm::Manager.config[:post_size_limit] OneApm::Manager.logger.debug "Tried to send too much data: #{post_string.size} bytes" raise UnrecoverableServerException.new('413 Request Entity Too Large') end end end end end