lib/new_relic/agent/agent.rb in newrelic_rpm-3.3.5 vs lib/new_relic/agent/agent.rb in newrelic_rpm-3.4.0.beta1
- old
+ new
@@ -3,51 +3,39 @@
require 'net/http'
require 'logger'
require 'zlib'
require 'stringio'
require 'new_relic/data_serialization'
+require 'new_relic/agent/new_relic_service'
+require 'new_relic/agent/pipe_service'
module NewRelic
module Agent
# The Agent is a singleton that is instantiated when the plugin is
# activated. It collects performance data from ruby applications
# in realtime as the application runs, and periodically sends that
# data to the NewRelic server.
class Agent
-
- # Specifies the version of the agent's communication protocol with
- # the NewRelic hosted site.
-
- PROTOCOL_VERSION = 8
- # 14105: v8 (tag 2.10.3)
- # (no v7)
- # 10379: v6 (not tagged)
- # 4078: v5 (tag 2.5.4)
- # 2292: v4 (tag 2.3.6)
- # 1754: v3 (tag 2.3.0)
- # 534: v2 (shows up in 2.1.0, our first tag)
-
-
def initialize
-
@launch_time = Time.now
@metric_ids = {}
@stats_engine = NewRelic::Agent::StatsEngine.new
@transaction_sampler = NewRelic::Agent::TransactionSampler.new
@sql_sampler = NewRelic::Agent::SqlSampler.new
@stats_engine.transaction_sampler = @transaction_sampler
@error_collector = NewRelic::Agent::ErrorCollector.new
@connect_attempts = 0
- @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60)
-
@last_harvest_time = Time.now
@obfuscator = lambda {|sql| NewRelic::Agent::Database.default_sql_obfuscator(sql) }
+ @forked = false
+
+ @service = NewRelic::Agent::NewRelicService.new(control.license_key, control.server)
end
-
+
# contains all the class-level methods for NewRelic::Agent::Agent
module ClassMethods
# Should only be called by NewRelic::Control - returns a
# memoized singleton instance of the agent, creating one if needed
def instance
@@ -79,10 +67,11 @@
attr_reader :url_rules
# a configuration for the Real User Monitoring system -
# handles things like static setup of the header for inclusion
# into pages
attr_reader :beacon_configuration
+ attr_accessor :service
# Returns the length of the unsent errors array, if it exists,
# otherwise nil
def unsent_errors_size
@unsent_errors.length if @unsent_errors
@@ -156,15 +145,20 @@
# had not connected.
# * <tt>:keep_retrying => false</tt> if we try to initiate a new
# connection, this tells me to only try it once so this method returns
# quickly if there is some kind of latency with the server.
def after_fork(options={})
-
+ @forked = true
# @connected gets false after we fail to connect or have an error
# connecting. @connected has nil if we haven't finished trying to connect.
# or we didn't attempt a connection because this is the master process
-
+
+ if channel_id = options[:report_to_channel]
+ @service = NewRelic::Agent::PipeService.new(channel_id)
+ @connected_pid = $$
+ end
+
# log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]"
return if !control.agent_enabled? or
!control.monitor_mode? or
@connected == false or
@worker_thread && @worker_thread.alive?
@@ -177,11 +171,15 @@
# Don't ever check to see if this is a spawner. If we're in a forked process
# I'm pretty sure we're not also forking new instances.
start_worker_thread(options)
@stats_engine.start_sampler_thread
end
-
+
+ def forked?
+ @forked
+ end
+
# True if we have initialized and completed 'start'
def started?
@started
end
@@ -201,29 +199,29 @@
run_loop_before_exit = options.fetch(:force_send, false)
return if not started?
if @worker_loop
@worker_loop.run_task if run_loop_before_exit
@worker_loop.stop
+ end
- log.debug "Starting Agent shutdown"
+ log.debug "Starting Agent shutdown"
- # if litespeed, then ignore all future SIGUSR1 - it's
- # litespeed trying to shut us down
+ # if litespeed, then ignore all future SIGUSR1 - it's
+ # litespeed trying to shut us down
- if control.dispatcher == :litespeed
- Signal.trap("SIGUSR1", "IGNORE")
- Signal.trap("SIGTERM", "IGNORE")
- end
+ if control.dispatcher == :litespeed
+ Signal.trap("SIGUSR1", "IGNORE")
+ Signal.trap("SIGTERM", "IGNORE")
+ end
- begin
- NewRelic::Agent.disable_all_tracing do
- graceful_disconnect
- end
- rescue => e
- log.error e
- log.error e.backtrace.join("\n")
+ begin
+ NewRelic::Agent.disable_all_tracing do
+ graceful_disconnect
end
+ rescue => e
+ log.error e
+ log.error e.backtrace.join("\n")
end
@started = nil
end
# Tells the statistics engine we are starting a new transaction
@@ -444,13 +442,13 @@
@last_harvest_time = Time.now
@launch_time = Time.now
end
private
- def collector
- @collector ||= control.server
- end
+# def collector
+# @collector ||= control.collector
+# end
# All of this module used to be contained in the
# start_worker_thread method - this is an artifact of
# refactoring and can be moved, renamed, etc at will
module StartWorkerThread
@@ -715,11 +713,11 @@
# Does some simple logging to make sure that our seed and
# token for verification are correct, then returns the
# connect data passed back from the server
def connect_to_server
log_seed_token
- connect_data = invoke_remote(:connect, connect_settings)
+ @service.connect(connect_settings)
end
# Configures the error collector if the server says that we
# are allowed to send errors. Pretty simple, and logs at
# debug whether errors will or will not be sent.
@@ -832,22 +830,20 @@
# Asks the collector to tell us which sub-collector we
# should be reporting to, and then does the name resolution
# on that host so we don't block on DNS during the normal
# course of agent processing
- def set_collector_host!
- host = invoke_remote(:get_redirect_host)
- if host
- @collector = control.server_from_host(host)
- end
- end
+# def set_collector_host!
+# host = invoke_remote(:get_redirect_host)
+# if host
+# @collector = control.server_from_host(host)
+# end
+# end
# Sets the collector host and connects to the server, then
# invokes the final configuration with the returned data
def query_server_for_configuration
- set_collector_host!
-
finish_setup(connect_to_server)
end
# Takes a hash of configuration data returned from the
# server and uses it to set local variables and to
@@ -855,11 +851,12 @@
# separately.
#
# Can accommodate most arbitrary data - anything extra is
# ignored unless we say to do something with it here.
def finish_setup(config_data)
- @agent_id = config_data['agent_run_id']
+ return if config_data == nil
+ @service.agent_id = config_data['agent_run_id']
@report_period = config_data['data_report_period']
@url_rules = config_data['url_rules']
@beacon_configuration = BeaconConfiguration.new(config_data)
@server_side_config_enabled = config_data['listen_to_server_config']
@@ -876,12 +873,12 @@
end
# Logs when we connect to the server, for debugging purposes
# - makes sure we know if an agent has not connected
def log_connection!(config_data)
- control.log! "Connected to NewRelic Service at #{@collector}"
- log.debug "Agent Run = #{@agent_id}."
+ control.log! "Connected to NewRelic Service at #{@service.collector.name}"
+ log.debug "Agent Run = #{@service.agent_id}."
log.debug "Connection data = #{config_data.inspect}"
end
end
include Connect
@@ -907,18 +904,19 @@
# dangerous if we re-merge the same data more than once - it
# will be sent multiple times.
def merge_data_from(data)
metrics, transaction_traces, errors = data
@stats_engine.merge_data(metrics) if metrics
- if transaction_traces
+ if transaction_traces && transaction_traces.respond_to?(:any?) &&
+ transaction_traces.any?
if @traces
- @traces = @traces + transaction_traces
+ @traces += transaction_traces
else
@traces = transaction_traces
end
end
- if errors
+ if errors && errors.respond_to?(:any?) && errors.any?
if @unsent_errors
@unsent_errors = @unsent_errors + errors
else
@unsent_errors = errors
end
@@ -1012,26 +1010,18 @@
def harvest_and_send_timeslice_data
now = Time.now
NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote').record_data_point(0.0)
NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote/metric_data').record_data_point(0.0)
harvest_timeslice_data(now)
- begin
- # In this version of the protocol, we get back an assoc array of spec to id.
- metric_specs_and_ids = invoke_remote(:metric_data, @agent_id,
- @last_harvest_time.to_f,
- now.to_f,
- @unsent_timeslice_data.values)
-
- rescue Timeout::Error
- # assume that the data was received. chances are that it
- # was. Also, lol.
- metric_specs_and_ids = []
- end
-
+ # In this version of the protocol, we get back an assoc array of spec to id.
+ metric_specs_and_ids = @service.metric_data(@last_harvest_time.to_f,
+ now.to_f,
+ @unsent_timeslice_data.values)
+ metric_specs_and_ids ||= []
fill_metric_id_cache(metric_specs_and_ids)
- log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
+ log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@service.agent_id}) in #{Time.now - now} seconds"
# if we successfully invoked this web service, then clear the unsent message cache.
@unsent_timeslice_data = {}
@last_harvest_time = now
end
@@ -1048,12 +1038,11 @@
# FIXME add the code to try to resend if our connection is down
sql_traces = @sql_sampler.harvest
unless sql_traces.empty?
log.debug "Sending (#{sql_traces.size}) sql traces"
begin
- response = invoke_remote :sql_trace_data, sql_traces
-# log.debug "Sql trace response: #{response}"
+ @service.sql_trace_data(sql_traces)
rescue
@sql_sampler.merge sql_traces
end
end
end
@@ -1075,18 +1064,18 @@
options[:record_sql] = @record_sql unless @record_sql == :off
if @transaction_sampler.explain_enabled
options[:explain_sql] = @transaction_sampler.explain_threshold
end
traces = @traces.collect {|trace| trace.prepare_to_send(options)}
- invoke_remote :transaction_sample_data, @agent_id, traces
+ @service.transaction_sample_data(traces)
rescue PostTooBigException
# we tried to send too much data, drop the first trace and
# try again
retry if @traces.shift
end
- log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds"
+ log.debug "Sent slowest sample (#{@service.agent_id}) in #{Time.now - now} seconds"
end
# if we succeed sending this sample, then we don't need to keep
# the slowest sample around - it has been sent already and we
# can clear the collection and move on
@@ -1108,11 +1097,11 @@
def harvest_and_send_errors
harvest_errors
if @unsent_errors && @unsent_errors.length > 0
log.debug "Sending #{@unsent_errors.length} errors"
begin
- invoke_remote :error_data, @agent_id, @unsent_errors
+ @service.error_data(@unsent_errors)
rescue PostTooBigException
@unsent_errors.shift
retry
end
# if the remote invocation fails, then we never clear
@@ -1120,157 +1109,11 @@
# the next heartbeat. Note the error collector maxes out at
# 20 instances to prevent leakage
@unsent_errors = []
end
end
-
- # This method handles the compression of the request body that
- # we are going to send to the server
- #
- # We currently optimize for CPU here since we get roughly a 10x
- # reduction in message size with this, and CPU overhead is at a
- # premium. For extra-large posts, we use the higher compression
- # since otherwise it actually errors out.
- #
- # We do not compress if content is smaller than 64kb. There are
- # problems with bugs in Ruby in some versions that expose us
- # to a risk of segfaults if we compress aggressively.
- #
- # medium payloads get fast compression, to save CPU
- # big payloads get all the compression possible, to stay under
- # the 2,000,000 byte post threshold
- def compress_data(object)
- dump = Marshal.dump(object)
-
- dump_size = dump.size
-
- return [dump, 'identity'] if dump_size < (64*1024)
-
- compressed_dump = Zlib::Deflate.deflate(dump, Zlib::DEFAULT_COMPRESSION)
-
- # this checks to make sure mongrel won't choke on big uploads
- check_post_size(compressed_dump)
-
- [compressed_dump, 'deflate']
- end
-
- # Raises a PostTooBigException if the post_string is longer
- # than the limit configured in the control object
- def check_post_size(post_string)
- # TODO: define this as a config option on the server side
- return if post_string.size < control.post_size_limit
- log.warn "Tried to send too much data: #{post_string.size} bytes"
- raise PostTooBigException
- end
-
- # Posts to the specified server
- #
- # Options:
- # - :uri => the path to request on the server (a misnomer of
- # course)
- # - :encoding => the encoding to pass to the server
- # - :collector => a URI object that responds to the 'name' method
- # and returns the name of the collector to
- # contact
- # - :data => the data to send as the body of the request
- def send_request(opts)
- request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name)
- request['user-agent'] = user_agent
- request.content_type = "application/octet-stream"
- request.body = opts[:data]
-
- log.debug "Connect to #{opts[:collector]}#{opts[:uri]}"
-
- response = nil
- http = control.http_connection(collector)
- http.read_timeout = nil
- begin
- NewRelic::TimerLib.timeout(@request_timeout) do
- response = http.request(request)
- end
- rescue Timeout::Error
- log.warn "Timed out trying to post data to New Relic (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30
- raise
- end
- if response.is_a? Net::HTTPServiceUnavailable
- raise NewRelic::Agent::ServerConnectionException, "Service unavailable (#{response.code}): #{response.message}"
- elsif response.is_a? Net::HTTPGatewayTimeOut
- log.debug("Timed out getting response: #{response.message}")
- raise Timeout::Error, response.message
- elsif response.is_a? Net::HTTPRequestEntityTooLarge
- raise PostTooBigException
- elsif !(response.is_a? Net::HTTPSuccess)
- raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server (#{response.code}): #{response.message}"
- end
- response
- end
-
- # Decompresses the response from the server, if it is gzip
- # encoded, otherwise returns it verbatim
- def decompress_response(response)
- if response['content-encoding'] != 'gzip'
- log.debug "Uncompressed content returned"
- return response.body
- end
- log.debug "Decompressing return value"
- i = Zlib::GzipReader.new(StringIO.new(response.body))
- i.read
- end
-
- # unmarshals the response and raises it if it is an exception,
- # so we can handle it in nonlocally
- def check_for_exception(response)
- dump = decompress_response(response)
- value = Marshal.load(dump)
- raise value if value.is_a? Exception
- value
- end
-
- # The path on the server that we should post our data to
- def remote_method_uri(method)
- uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}"
- uri << "?run_id=#{@agent_id}" if @agent_id
- uri
- end
-
- # Sets the user agent for connections to the server, to
- # conform with the HTTP spec and allow for debugging. Includes
- # the ruby version and also zlib version if available since
- # that may cause corrupt compression if there is a problem.
- def user_agent
- ruby_description = ''
- # note the trailing space!
- ruby_description << "(ruby #{::RUBY_VERSION} #{::RUBY_PLATFORM}) " if defined?(::RUBY_VERSION) && defined?(::RUBY_PLATFORM)
- zlib_version = ''
- zlib_version << "zlib/#{Zlib.zlib_version}" if defined?(::Zlib) && Zlib.respond_to?(:zlib_version)
- "NewRelic-RubyAgent/#{NewRelic::VERSION::STRING} #{ruby_description}#{zlib_version}"
- end
-
- # send a message via post to the actual server. This attempts
- # to automatically compress the data via zlib if it is large
- # enough to be worth compressing, and handles any errors the
- # server may return
- def invoke_remote(method, *args)
- now = Time.now
- #determines whether to zip the data or send plain
- post_data, encoding = compress_data(args)
-
- response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data})
-
- # raises the right exception if the remote server tells it to die
- return check_for_exception(response)
- rescue NewRelic::Agent::ForceRestartException => e
- log.info e.message
- raise
- rescue SystemCallError, SocketError => e
- # These include Errno connection errors
- raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}"
- ensure
- NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote').record_data_point((Time.now - now).to_f)
- NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote/' + method.to_s).record_data_point((Time.now - now).to_f)
- end
-
+
def save_or_transmit_data
if NewRelic::DataSerialization.should_send_data?
log.debug "Sending data to New Relic Service"
NewRelic::Agent.load_data unless NewRelic::Control.instance.disable_serialization?
harvest_and_send_errors
@@ -1286,10 +1129,12 @@
NewRelic::Control.instance.log.warn("Disabling serialization: #{e.message}")
retry_count ||= 0
retry_count += 1
retry unless retry_count > 1
raise e
+ ensure
+ NewRelic::Agent::Database.close_connections unless forked?
end
# This method contacts the server to send remaining data and
# let the server know that the agent is shutting down - this
# allows us to do things like accurately set the end of the
@@ -1298,15 +1143,15 @@
# If this process comes from a parent process, it will not
# disconnect, so that the parent process can continue to send data
def graceful_disconnect
if @connected
begin
- @request_timeout = 10
+ @service.request_timeout = 10
save_or_transmit_data
- if @connected_pid == $$
+ if @connected_pid == $$ && !@service.kind_of?(NewRelic::Agent::NewRelicService)
log.debug "Sending New Relic service agent run shutdown message"
- invoke_remote :shutdown, @agent_id, Time.now.to_f
+ @service.shutdown(Time.now.to_f)
else
log.debug "This agent connected from parent process #{@connected_pid}--not sending shutdown"
end
log.debug "Graceful disconnect complete"
rescue Timeout::Error, StandardError
@@ -1314,10 +1159,10 @@
else
log.debug "Bypassing graceful disconnect - agent not connected"
end
end
end
-
+
extend ClassMethods
include InstanceMethods
include BrowserMonitoring
end
end