lib/new_relic/agent/agent.rb in newrelic_rpm-3.3.5 vs lib/new_relic/agent/agent.rb in newrelic_rpm-3.4.0.beta1

- old
+ new

@@ -3,51 +3,39 @@ require 'net/http' require 'logger' require 'zlib' require 'stringio' require 'new_relic/data_serialization' +require 'new_relic/agent/new_relic_service' +require 'new_relic/agent/pipe_service' module NewRelic module Agent # The Agent is a singleton that is instantiated when the plugin is # activated. It collects performance data from ruby applications # in realtime as the application runs, and periodically sends that # data to the NewRelic server. class Agent - - # Specifies the version of the agent's communication protocol with - # the NewRelic hosted site. - - PROTOCOL_VERSION = 8 - # 14105: v8 (tag 2.10.3) - # (no v7) - # 10379: v6 (not tagged) - # 4078: v5 (tag 2.5.4) - # 2292: v4 (tag 2.3.6) - # 1754: v3 (tag 2.3.0) - # 534: v2 (shows up in 2.1.0, our first tag) - - def initialize - @launch_time = Time.now @metric_ids = {} @stats_engine = NewRelic::Agent::StatsEngine.new @transaction_sampler = NewRelic::Agent::TransactionSampler.new @sql_sampler = NewRelic::Agent::SqlSampler.new @stats_engine.transaction_sampler = @transaction_sampler @error_collector = NewRelic::Agent::ErrorCollector.new @connect_attempts = 0 - @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60) - @last_harvest_time = Time.now @obfuscator = lambda {|sql| NewRelic::Agent::Database.default_sql_obfuscator(sql) } + @forked = false + + @service = NewRelic::Agent::NewRelicService.new(control.license_key, control.server) end - + # contains all the class-level methods for NewRelic::Agent::Agent module ClassMethods # Should only be called by NewRelic::Control - returns a # memoized singleton instance of the agent, creating one if needed def instance @@ -79,10 +67,11 @@ attr_reader :url_rules # a configuration for the Real User Monitoring system - # handles things like static setup of the header for inclusion # into pages attr_reader :beacon_configuration + attr_accessor :service # Returns the length of the unsent errors array, if it exists, # otherwise nil def unsent_errors_size @unsent_errors.length if @unsent_errors @@ -156,15 +145,20 @@ # had not connected. # * <tt>:keep_retrying => false</tt> if we try to initiate a new # connection, this tells me to only try it once so this method returns # quickly if there is some kind of latency with the server. def after_fork(options={}) - + @forked = true # @connected gets false after we fail to connect or have an error # connecting. @connected has nil if we haven't finished trying to connect. # or we didn't attempt a connection because this is the master process - + + if channel_id = options[:report_to_channel] + @service = NewRelic::Agent::PipeService.new(channel_id) + @connected_pid = $$ + end + # log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]" return if !control.agent_enabled? or !control.monitor_mode? or @connected == false or @worker_thread && @worker_thread.alive? @@ -177,11 +171,15 @@ # Don't ever check to see if this is a spawner. If we're in a forked process # I'm pretty sure we're not also forking new instances. start_worker_thread(options) @stats_engine.start_sampler_thread end - + + def forked? + @forked + end + # True if we have initialized and completed 'start' def started? @started end @@ -201,29 +199,29 @@ run_loop_before_exit = options.fetch(:force_send, false) return if not started? if @worker_loop @worker_loop.run_task if run_loop_before_exit @worker_loop.stop + end - log.debug "Starting Agent shutdown" + log.debug "Starting Agent shutdown" - # if litespeed, then ignore all future SIGUSR1 - it's - # litespeed trying to shut us down + # if litespeed, then ignore all future SIGUSR1 - it's + # litespeed trying to shut us down - if control.dispatcher == :litespeed - Signal.trap("SIGUSR1", "IGNORE") - Signal.trap("SIGTERM", "IGNORE") - end + if control.dispatcher == :litespeed + Signal.trap("SIGUSR1", "IGNORE") + Signal.trap("SIGTERM", "IGNORE") + end - begin - NewRelic::Agent.disable_all_tracing do - graceful_disconnect - end - rescue => e - log.error e - log.error e.backtrace.join("\n") + begin + NewRelic::Agent.disable_all_tracing do + graceful_disconnect end + rescue => e + log.error e + log.error e.backtrace.join("\n") end @started = nil end # Tells the statistics engine we are starting a new transaction @@ -444,13 +442,13 @@ @last_harvest_time = Time.now @launch_time = Time.now end private - def collector - @collector ||= control.server - end +# def collector +# @collector ||= control.collector +# end # All of this module used to be contained in the # start_worker_thread method - this is an artifact of # refactoring and can be moved, renamed, etc at will module StartWorkerThread @@ -715,11 +713,11 @@ # Does some simple logging to make sure that our seed and # token for verification are correct, then returns the # connect data passed back from the server def connect_to_server log_seed_token - connect_data = invoke_remote(:connect, connect_settings) + @service.connect(connect_settings) end # Configures the error collector if the server says that we # are allowed to send errors. Pretty simple, and logs at # debug whether errors will or will not be sent. @@ -832,22 +830,20 @@ # Asks the collector to tell us which sub-collector we # should be reporting to, and then does the name resolution # on that host so we don't block on DNS during the normal # course of agent processing - def set_collector_host! - host = invoke_remote(:get_redirect_host) - if host - @collector = control.server_from_host(host) - end - end +# def set_collector_host! +# host = invoke_remote(:get_redirect_host) +# if host +# @collector = control.server_from_host(host) +# end +# end # Sets the collector host and connects to the server, then # invokes the final configuration with the returned data def query_server_for_configuration - set_collector_host! - finish_setup(connect_to_server) end # Takes a hash of configuration data returned from the # server and uses it to set local variables and to @@ -855,11 +851,12 @@ # separately. # # Can accommodate most arbitrary data - anything extra is # ignored unless we say to do something with it here. def finish_setup(config_data) - @agent_id = config_data['agent_run_id'] + return if config_data == nil + @service.agent_id = config_data['agent_run_id'] @report_period = config_data['data_report_period'] @url_rules = config_data['url_rules'] @beacon_configuration = BeaconConfiguration.new(config_data) @server_side_config_enabled = config_data['listen_to_server_config'] @@ -876,12 +873,12 @@ end # Logs when we connect to the server, for debugging purposes # - makes sure we know if an agent has not connected def log_connection!(config_data) - control.log! "Connected to NewRelic Service at #{@collector}" - log.debug "Agent Run = #{@agent_id}." + control.log! "Connected to NewRelic Service at #{@service.collector.name}" + log.debug "Agent Run = #{@service.agent_id}." log.debug "Connection data = #{config_data.inspect}" end end include Connect @@ -907,18 +904,19 @@ # dangerous if we re-merge the same data more than once - it # will be sent multiple times. def merge_data_from(data) metrics, transaction_traces, errors = data @stats_engine.merge_data(metrics) if metrics - if transaction_traces + if transaction_traces && transaction_traces.respond_to?(:any?) && + transaction_traces.any? if @traces - @traces = @traces + transaction_traces + @traces += transaction_traces else @traces = transaction_traces end end - if errors + if errors && errors.respond_to?(:any?) && errors.any? if @unsent_errors @unsent_errors = @unsent_errors + errors else @unsent_errors = errors end @@ -1012,26 +1010,18 @@ def harvest_and_send_timeslice_data now = Time.now NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote').record_data_point(0.0) NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote/metric_data').record_data_point(0.0) harvest_timeslice_data(now) - begin - # In this version of the protocol, we get back an assoc array of spec to id. - metric_specs_and_ids = invoke_remote(:metric_data, @agent_id, - @last_harvest_time.to_f, - now.to_f, - @unsent_timeslice_data.values) - - rescue Timeout::Error - # assume that the data was received. chances are that it - # was. Also, lol. - metric_specs_and_ids = [] - end - + # In this version of the protocol, we get back an assoc array of spec to id. + metric_specs_and_ids = @service.metric_data(@last_harvest_time.to_f, + now.to_f, + @unsent_timeslice_data.values) + metric_specs_and_ids ||= [] fill_metric_id_cache(metric_specs_and_ids) - log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds" + log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@service.agent_id}) in #{Time.now - now} seconds" # if we successfully invoked this web service, then clear the unsent message cache. @unsent_timeslice_data = {} @last_harvest_time = now end @@ -1048,12 +1038,11 @@ # FIXME add the code to try to resend if our connection is down sql_traces = @sql_sampler.harvest unless sql_traces.empty? log.debug "Sending (#{sql_traces.size}) sql traces" begin - response = invoke_remote :sql_trace_data, sql_traces -# log.debug "Sql trace response: #{response}" + @service.sql_trace_data(sql_traces) rescue @sql_sampler.merge sql_traces end end end @@ -1075,18 +1064,18 @@ options[:record_sql] = @record_sql unless @record_sql == :off if @transaction_sampler.explain_enabled options[:explain_sql] = @transaction_sampler.explain_threshold end traces = @traces.collect {|trace| trace.prepare_to_send(options)} - invoke_remote :transaction_sample_data, @agent_id, traces + @service.transaction_sample_data(traces) rescue PostTooBigException # we tried to send too much data, drop the first trace and # try again retry if @traces.shift end - log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds" + log.debug "Sent slowest sample (#{@service.agent_id}) in #{Time.now - now} seconds" end # if we succeed sending this sample, then we don't need to keep # the slowest sample around - it has been sent already and we # can clear the collection and move on @@ -1108,11 +1097,11 @@ def harvest_and_send_errors harvest_errors if @unsent_errors && @unsent_errors.length > 0 log.debug "Sending #{@unsent_errors.length} errors" begin - invoke_remote :error_data, @agent_id, @unsent_errors + @service.error_data(@unsent_errors) rescue PostTooBigException @unsent_errors.shift retry end # if the remote invocation fails, then we never clear @@ -1120,157 +1109,11 @@ # the next heartbeat. Note the error collector maxes out at # 20 instances to prevent leakage @unsent_errors = [] end end - - # This method handles the compression of the request body that - # we are going to send to the server - # - # We currently optimize for CPU here since we get roughly a 10x - # reduction in message size with this, and CPU overhead is at a - # premium. For extra-large posts, we use the higher compression - # since otherwise it actually errors out. - # - # We do not compress if content is smaller than 64kb. There are - # problems with bugs in Ruby in some versions that expose us - # to a risk of segfaults if we compress aggressively. - # - # medium payloads get fast compression, to save CPU - # big payloads get all the compression possible, to stay under - # the 2,000,000 byte post threshold - def compress_data(object) - dump = Marshal.dump(object) - - dump_size = dump.size - - return [dump, 'identity'] if dump_size < (64*1024) - - compressed_dump = Zlib::Deflate.deflate(dump, Zlib::DEFAULT_COMPRESSION) - - # this checks to make sure mongrel won't choke on big uploads - check_post_size(compressed_dump) - - [compressed_dump, 'deflate'] - end - - # Raises a PostTooBigException if the post_string is longer - # than the limit configured in the control object - def check_post_size(post_string) - # TODO: define this as a config option on the server side - return if post_string.size < control.post_size_limit - log.warn "Tried to send too much data: #{post_string.size} bytes" - raise PostTooBigException - end - - # Posts to the specified server - # - # Options: - # - :uri => the path to request on the server (a misnomer of - # course) - # - :encoding => the encoding to pass to the server - # - :collector => a URI object that responds to the 'name' method - # and returns the name of the collector to - # contact - # - :data => the data to send as the body of the request - def send_request(opts) - request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name) - request['user-agent'] = user_agent - request.content_type = "application/octet-stream" - request.body = opts[:data] - - log.debug "Connect to #{opts[:collector]}#{opts[:uri]}" - - response = nil - http = control.http_connection(collector) - http.read_timeout = nil - begin - NewRelic::TimerLib.timeout(@request_timeout) do - response = http.request(request) - end - rescue Timeout::Error - log.warn "Timed out trying to post data to New Relic (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30 - raise - end - if response.is_a? Net::HTTPServiceUnavailable - raise NewRelic::Agent::ServerConnectionException, "Service unavailable (#{response.code}): #{response.message}" - elsif response.is_a? Net::HTTPGatewayTimeOut - log.debug("Timed out getting response: #{response.message}") - raise Timeout::Error, response.message - elsif response.is_a? Net::HTTPRequestEntityTooLarge - raise PostTooBigException - elsif !(response.is_a? Net::HTTPSuccess) - raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server (#{response.code}): #{response.message}" - end - response - end - - # Decompresses the response from the server, if it is gzip - # encoded, otherwise returns it verbatim - def decompress_response(response) - if response['content-encoding'] != 'gzip' - log.debug "Uncompressed content returned" - return response.body - end - log.debug "Decompressing return value" - i = Zlib::GzipReader.new(StringIO.new(response.body)) - i.read - end - - # unmarshals the response and raises it if it is an exception, - # so we can handle it in nonlocally - def check_for_exception(response) - dump = decompress_response(response) - value = Marshal.load(dump) - raise value if value.is_a? Exception - value - end - - # The path on the server that we should post our data to - def remote_method_uri(method) - uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}" - uri << "?run_id=#{@agent_id}" if @agent_id - uri - end - - # Sets the user agent for connections to the server, to - # conform with the HTTP spec and allow for debugging. Includes - # the ruby version and also zlib version if available since - # that may cause corrupt compression if there is a problem. - def user_agent - ruby_description = '' - # note the trailing space! - ruby_description << "(ruby #{::RUBY_VERSION} #{::RUBY_PLATFORM}) " if defined?(::RUBY_VERSION) && defined?(::RUBY_PLATFORM) - zlib_version = '' - zlib_version << "zlib/#{Zlib.zlib_version}" if defined?(::Zlib) && Zlib.respond_to?(:zlib_version) - "NewRelic-RubyAgent/#{NewRelic::VERSION::STRING} #{ruby_description}#{zlib_version}" - end - - # send a message via post to the actual server. This attempts - # to automatically compress the data via zlib if it is large - # enough to be worth compressing, and handles any errors the - # server may return - def invoke_remote(method, *args) - now = Time.now - #determines whether to zip the data or send plain - post_data, encoding = compress_data(args) - - response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data}) - - # raises the right exception if the remote server tells it to die - return check_for_exception(response) - rescue NewRelic::Agent::ForceRestartException => e - log.info e.message - raise - rescue SystemCallError, SocketError => e - # These include Errno connection errors - raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}" - ensure - NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote').record_data_point((Time.now - now).to_f) - NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote/' + method.to_s).record_data_point((Time.now - now).to_f) - end - + def save_or_transmit_data if NewRelic::DataSerialization.should_send_data? log.debug "Sending data to New Relic Service" NewRelic::Agent.load_data unless NewRelic::Control.instance.disable_serialization? harvest_and_send_errors @@ -1286,10 +1129,12 @@ NewRelic::Control.instance.log.warn("Disabling serialization: #{e.message}") retry_count ||= 0 retry_count += 1 retry unless retry_count > 1 raise e + ensure + NewRelic::Agent::Database.close_connections unless forked? end # This method contacts the server to send remaining data and # let the server know that the agent is shutting down - this # allows us to do things like accurately set the end of the @@ -1298,15 +1143,15 @@ # If this process comes from a parent process, it will not # disconnect, so that the parent process can continue to send data def graceful_disconnect if @connected begin - @request_timeout = 10 + @service.request_timeout = 10 save_or_transmit_data - if @connected_pid == $$ + if @connected_pid == $$ && !@service.kind_of?(NewRelic::Agent::NewRelicService) log.debug "Sending New Relic service agent run shutdown message" - invoke_remote :shutdown, @agent_id, Time.now.to_f + @service.shutdown(Time.now.to_f) else log.debug "This agent connected from parent process #{@connected_pid}--not sending shutdown" end log.debug "Graceful disconnect complete" rescue Timeout::Error, StandardError @@ -1314,10 +1159,10 @@ else log.debug "Bypassing graceful disconnect - agent not connected" end end end - + extend ClassMethods include InstanceMethods include BrowserMonitoring end end