lib/new_relic/agent/agent.rb in newrelic_rpm-2.9.3 vs lib/new_relic/agent/agent.rb in newrelic_rpm-2.9.4

- old
+ new

@@ -200,11 +200,11 @@ # successfully connected at_exit { shutdown } end end control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #{$$}" - control.log! "Agent Log found in #{NewRelic::Control.instance.log_file}" + control.log! "Agent Log found in #{NewRelic::Control.instance.log_file}" if NewRelic::Control.instance.log_file end private def collector @collector ||= control.server @@ -320,14 +320,14 @@ # there's a bad license key. def connect # wait a few seconds for the web server to boot, necessary in development connect_retry_period = 5 connect_attempts = 0 - + @agent_id = nil begin sleep connect_retry_period.to_i - @agent_id = invoke_remote :start, @local_host, { + @agent_id ||= invoke_remote :start, @local_host, { :pid => $$, :launch_time => @launch_time.to_f, :agent_version => NewRelic::VERSION::STRING, :environment => control.local_env.snapshot, :settings => control.settings } @@ -373,20 +373,18 @@ log.debug e.backtrace.join("\n") end # retry logic connect_attempts += 1 case connect_attempts - when 1..5 - connect_retry_period, period_msg = 5, nil - when 6..10 then - connect_retry_period, period_msg = 30, nil - when 11..20 then + when 1..2 connect_retry_period, period_msg = 60, "1 minute" + when 3..5 then + connect_retry_period, period_msg = 60 * 2, "2 minutes" else connect_retry_period, period_msg = 10*60, "10 minutes" end - log.info "Will re-attempt in #{period_msg}" if period_msg + log.info "Will re-attempt in #{period_msg}" retry end end def determine_host @@ -443,19 +441,28 @@ @traces = @transaction_sampler.harvest(@traces, @slowest_transaction_threshold) unless @traces.empty? now = Time.now log.debug "Sending (#{@traces.length}) transaction traces" + begin + # take the traces and prepare them for sending across the + # wire. This includes gathering SQL explanations, stripping + # out stack traces, and normalizing SQL. note that we + # explain only the sql statements whose segments' execution + # times exceed our threshold (to avoid unnecessary overhead + # of running explains on fast queries.) + traces = @traces.collect {|trace| trace.prepare_to_send(:explain_sql => @explain_threshold, :record_sql => @record_sql, :keep_backtraces => true, :explain_enabled => @explain_enabled)} + + + invoke_remote :transaction_sample_data, @agent_id, traces + rescue PostTooBigException + # we tried to send too much data, drop the first trace and + # try again + @traces.shift + retry + end - # take the traces and prepare them for sending across the wire. This includes - # gathering SQL explanations, stripping out stack traces, and normalizing SQL. - # note that we explain only the sql statements whose segments' execution times exceed - # our threshold (to avoid unnecessary overhead of running explains on fast queries.) - traces = @traces.collect {|trace| trace.prepare_to_send(:explain_sql => @explain_threshold, :record_sql => @record_sql, :keep_backtraces => true, :explain_enabled => @explain_enabled)} - - invoke_remote :transaction_sample_data, @agent_id, traces - log.debug "#{now}: sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds" end # if we succeed sending this sample, then we don't need to keep # the slowest sample around - it has been sent already and we @@ -470,71 +477,112 @@ def harvest_and_send_errors @unsent_errors = @error_collector.harvest_errors(@unsent_errors) if @unsent_errors && @unsent_errors.length > 0 log.debug "Sending #{@unsent_errors.length} errors" - - invoke_remote :error_data, @agent_id, @unsent_errors - - # if the remote invocation fails, then we never clear @unsent_errors, - # and therefore we can re-attempt to send on the next heartbeat. Note - # the error collector maxes out at 20 instances to prevent leakage + begin + invoke_remote :error_data, @agent_id, @unsent_errors + rescue PostTooBigException + @unsent_errors.shift + retry + end + # if the remote invocation fails, then we never clear + # @unsent_errors, and therefore we can re-attempt to send on + # the next heartbeat. Note the error collector maxes out at + # 20 instances to prevent leakage @unsent_errors = [] end end - - # send a message via post - def invoke_remote(method, *args) - # we currently optimize for CPU here since we get roughly a 10x reduction in - # message size with this, and CPU overhead is at a premium. If we wanted - # to go for higher compression instead, we could use Zlib::BEST_COMPRESSION and - # pay a little more CPU. - data = Marshal.dump(args) - encoding = data.size > 2000 ? 'deflate' : 'identity' # don't compress small payloads - post_data = encoding == 'deflate' ? Zlib::Deflate.deflate(data, Zlib::BEST_SPEED) : data - http = control.http_connection(collector) + + def compress_data(object) + dump = Marshal.dump(object) - uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}" - uri += "?run_id=#{@agent_id}" if @agent_id + # we currently optimize for CPU here since we get roughly a 10x + # reduction in message size with this, and CPU overhead is at a + # premium. For extra-large posts, we use the higher compression + # since otherwise it actually errors out. - request = Net::HTTP::Post.new(uri, 'CONTENT-ENCODING' => encoding, 'ACCEPT-ENCODING' => 'gzip', 'HOST' => collector.name) + dump_size = dump.size + + # small payloads don't need compression + return [dump, 'identity'] if dump_size < 2000 + + # medium payloads get fast compression, to save CPU + # big payloads get all the compression possible, to stay under + # the 2,000,000 byte post threshold + compression = dump_size < 2000000 ? Zlib::BEST_SPEED : Zlib::BEST_COMPRESSION + + [Zlib::Deflate.deflate(dump, compression), 'deflate'] + end + + def check_post_size(post_string) + # TODO: define this as a config option on the server side + return if post_string.size < 2000000 + log.warn "Tried to send too much data, retrying with less: #{post_string.size} bytes" + raise PostTooBigException + end + + def send_request(opts) + request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'ACCEPT-ENCODING' => 'gzip', 'HOST' => opts[:collector].name) request.content_type = "application/octet-stream" - request.body = post_data + request.body = opts[:data] - log.debug "connect to #{collector}#{uri}" + log.debug "connect to #{opts[:collector]}#{opts[:uri]}" response = nil - + http = control.http_connection(collector) begin timeout(@request_timeout) do response = http.request(request) end rescue Timeout::Error log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30 raise IgnoreSilentlyException end - if response.is_a? Net::HTTPSuccess - body = nil - if response['content-encoding'] == 'gzip' - log.debug "Decompressing return value" - i = Zlib::GzipReader.new(StringIO.new(response.body)) - body = i.read - else - log.debug "Uncompressed content returned" - body = response.body - end - return_value = Marshal.load(body) - if return_value.is_a? Exception - raise return_value - else - return return_value - end - else + if !(response.is_a? Net::HTTPSuccess) log.debug "Unexpected response from server: #{response.code}: #{response.message}" raise IgnoreSilentlyException - end + end + response + end + + def decompress_response(response) + if response['content-encoding'] != 'gzip' + log.debug "Uncompressed content returned" + return response.body + end + log.debug "Decompressing return value" + i = Zlib::GzipReader.new(StringIO.new(response.body)) + i.read + end + + def check_for_exception(response) + dump = decompress_response(response) + value = Marshal.load(dump) + raise value if value.is_a? Exception + value + end + + def remote_method_uri(method) + uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}" + uri << "?run_id=#{@agent_id}" if @agent_id + uri + end + + # send a message via post + def invoke_remote(method, *args) + #determines whether to zip the data or send plain + post_data, encoding = compress_data(args) + + # this checks to make sure mongrel won't choke on big uploads + check_post_size(post_data) + + response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data}) + + # raises the right exception if the remote server tells it to die + return check_for_exception(response) rescue ForceDisconnectException => e log.error "RPM forced this agent to disconnect (#{e.message})\n" \ "Restart this process to resume monitoring via rpm.newrelic.com." # when a disconnect is requested, stop the current thread, which # is the worker thread that gathers data and talks to the @@ -560,10 +608,10 @@ log.debug "Graceful shutdown complete" rescue Timeout::Error, StandardError end else - log.debug "Bypassing graceful shutdown - agent in development mode" + log.debug "Bypassing graceful shutdown - agent not connected" end end end end