lib/new_relic/agent/agent.rb in ghazel-newrelic_rpm-3.1.0.1 vs lib/new_relic/agent/agent.rb in ghazel-newrelic_rpm-3.4.0.2
- old
+ new
@@ -2,94 +2,113 @@
require 'net/https'
require 'net/http'
require 'logger'
require 'zlib'
require 'stringio'
-require 'new_relic/data_serialization'
+require 'new_relic/agent/new_relic_service'
+require 'new_relic/agent/pipe_service'
module NewRelic
module Agent
# The Agent is a singleton that is instantiated when the plugin is
# activated. It collects performance data from ruby applications
# in realtime as the application runs, and periodically sends that
# data to the NewRelic server.
class Agent
-
- # Specifies the version of the agent's communication protocol with
- # the NewRelic hosted site.
-
- PROTOCOL_VERSION = 8
- # 14105: v8 (tag 2.10.3)
- # (no v7)
- # 10379: v6 (not tagged)
- # 4078: v5 (tag 2.5.4)
- # 2292: v4 (tag 2.3.6)
- # 1754: v3 (tag 2.3.0)
- # 534: v2 (shows up in 2.1.0, our first tag)
-
-
def initialize
-
@launch_time = Time.now
@metric_ids = {}
- @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
@stats_engine = NewRelic::Agent::StatsEngine.new
@transaction_sampler = NewRelic::Agent::TransactionSampler.new
+ @sql_sampler = NewRelic::Agent::SqlSampler.new
@stats_engine.transaction_sampler = @transaction_sampler
@error_collector = NewRelic::Agent::ErrorCollector.new
@connect_attempts = 0
- @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60)
-
@last_harvest_time = Time.now
- @obfuscator = method(:default_sql_obfuscator)
+ @obfuscator = lambda {|sql| NewRelic::Agent::Database.default_sql_obfuscator(sql) }
+ @forked = false
+
+ # FIXME: temporary work around for RUBY-839
+ if control.monitor_mode?
+ @service = NewRelic::Agent::NewRelicService.new(control.license_key, control.server)
+ end
end
+ # contains all the class-level methods for NewRelic::Agent::Agent
module ClassMethods
- # Should only be called by NewRelic::Control
+ # Should only be called by NewRelic::Control - returns a
+ # memoized singleton instance of the agent, creating one if needed
def instance
@instance ||= self.new
end
end
-
+
+ # Holds all the methods defined on NewRelic::Agent::Agent
+ # instances
module InstanceMethods
-
+
+ # holds a proc that is used to obfuscate sql statements
attr_reader :obfuscator
+ # the statistics engine that holds all the timeslice data
attr_reader :stats_engine
+ # the transaction sampler that handles recording transactions
attr_reader :transaction_sampler
+ attr_reader :sql_sampler
+ # error collector is a simple collection of recorded errors
attr_reader :error_collector
+ # whether we should record raw, obfuscated, or no sql
attr_reader :record_sql
- attr_reader :histogram
+ # a cached set of metric_ids to save the collector some time -
+ # it returns a metric id for every metric name we send it, and
+ # in the future we transmit using the metric id only
attr_reader :metric_ids
+ # in theory a set of rules applied by the agent to the output
+ # of its metrics. Currently unimplemented
attr_reader :url_rules
+ # a configuration for the Real User Monitoring system -
+ # handles things like static setup of the header for inclusion
+ # into pages
attr_reader :beacon_configuration
-
+ attr_accessor :service
+
+ # Returns the length of the unsent errors array, if it exists,
+ # otherwise nil
def unsent_errors_size
@unsent_errors.length if @unsent_errors
end
-
+
+ # Returns the length of the traces array, if it exists,
+ # otherwise nil
def unsent_traces_size
@traces.length if @traces
end
-
+
+ # Initializes the unsent timeslice data hash, if needed, and
+ # returns the number of keys it contains
def unsent_timeslice_data
@unsent_timeslice_data ||= {}
@unsent_timeslice_data.keys.length
end
+ # fakes out a transaction that did not happen in this process
+ # by creating apdex, summary metrics, and recording statistics
+ # for the transaction
+ #
+ # This method is *deprecated* - it may be removed in future
+ # versions of the agent
def record_transaction(duration_seconds, options={})
is_error = options['is_error'] || options['error_message'] || options['exception']
metric = options['metric']
metric ||= options['uri'] # normalize this with url rules
raise "metric or uri arguments required" unless metric
metric_info = NewRelic::MetricParser::MetricParser.for_metric_named(metric)
if metric_info.is_web_transaction?
NewRelic::Agent::Instrumentation::MetricFrame.record_apdex(metric_info, duration_seconds, duration_seconds, is_error)
- histogram.process(duration_seconds)
end
metrics = metric_info.summary_metrics
metrics << metric
metrics.each do |name|
@@ -99,13 +118,13 @@
if is_error
if options['exception']
e = options['exception']
elsif options['error_message']
- e = Exception.new options['error_message']
+ e = StandardError.new options['error_message']
else
- e = Exception.new 'Unknown Error'
+ e = StandardError.new 'Unknown Error'
end
error_collector.notice_error e, :uri => options['uri'], :metric => metric
end
# busy time ?
end
@@ -128,15 +147,21 @@
# had not connected.
# * <tt>:keep_retrying => false</tt> if we try to initiate a new
# connection, this tells me to only try it once so this method returns
# quickly if there is some kind of latency with the server.
def after_fork(options={})
-
+ @forked = true
# @connected gets false after we fail to connect or have an error
# connecting. @connected has nil if we haven't finished trying to connect.
# or we didn't attempt a connection because this is the master process
-
+
+ if channel_id = options[:report_to_channel]
+ @service = NewRelic::Agent::PipeService.new(channel_id)
+ @connected_pid = $$
+ @metric_ids = {}
+ end
+
# log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]"
return if !control.agent_enabled? or
!control.monitor_mode? or
@connected == false or
@worker_thread && @worker_thread.alive?
@@ -149,11 +174,15 @@
# Don't ever check to see if this is a spawner. If we're in a forked process
# I'm pretty sure we're not also forking new instances.
start_worker_thread(options)
@stats_engine.start_sampler_thread
end
-
+
+ def forked?
+ @forked
+ end
+
# True if we have initialized and completed 'start'
def started?
@started
end
@@ -161,61 +190,77 @@
# and false if we failed to start.
def connected?
@connected
end
- # Attempt a graceful shutdown of the agent.
+ # Attempt a graceful shutdown of the agent, running the worker
+ # loop if it exists and is running.
+ #
+ # Options:
+ # :force_send => (true/false) # force the agent to send data
+ # before shutting down
def shutdown(options={})
run_loop_before_exit = options.fetch(:force_send, false)
return if not started?
if @worker_loop
@worker_loop.run_task if run_loop_before_exit
@worker_loop.stop
+ end
- log.debug "Starting Agent shutdown"
+ log.debug "Starting Agent shutdown"
- # if litespeed, then ignore all future SIGUSR1 - it's
- # litespeed trying to shut us down
+ # if litespeed, then ignore all future SIGUSR1 - it's
+ # litespeed trying to shut us down
- if control.dispatcher == :litespeed
- Signal.trap("SIGUSR1", "IGNORE")
- Signal.trap("SIGTERM", "IGNORE")
- end
+ if control.dispatcher == :litespeed
+ Signal.trap("SIGUSR1", "IGNORE")
+ Signal.trap("SIGTERM", "IGNORE")
+ end
- begin
- NewRelic::Agent.disable_all_tracing do
- graceful_disconnect
- end
- rescue => e
- log.error e
- log.error e.backtrace.join("\n")
+ begin
+ NewRelic::Agent.disable_all_tracing do
+ graceful_disconnect
end
+ rescue => e
+ log.error e
+ log.error e.backtrace.join("\n")
end
@started = nil
end
+ # Tells the statistics engine we are starting a new transaction
def start_transaction
@stats_engine.start_transaction
end
+ # Tells the statistics engine we are ending a transaction
def end_transaction
@stats_engine.end_transaction
end
+ # Sets a thread local variable as to whether we should or
+ # should not record sql in the current thread. Returns the
+ # previous value, if there is one
def set_record_sql(should_record)
prev = Thread::current[:record_sql]
Thread::current[:record_sql] = should_record
prev.nil? || prev
end
+ # Sets a thread local variable as to whether we should or
+ # should not record transaction traces in the current
+ # thread. Returns the previous value, if there is one
def set_record_tt(should_record)
prev = Thread::current[:record_tt]
Thread::current[:record_tt] = should_record
prev.nil? || prev
end
+
# Push flag indicating whether we should be tracing in this
- # thread.
+ # thread. This uses a stack which allows us to disable tracing
+ # children of a transaction without affecting the tracing of
+ # the whole transaction
def push_trace_execution_flag(should_trace=false)
value = Thread.current[:newrelic_untraced]
if (value.nil?)
Thread.current[:newrelic_untraced] = []
end
@@ -227,169 +272,147 @@
# to what it was before we pushed the current flag.
def pop_trace_execution_flag
Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced]
end
- def set_sql_obfuscator(type, &block)
- if type == :before
- @obfuscator = NewRelic::ChainedCall.new(block, @obfuscator)
- elsif type == :after
- @obfuscator = NewRelic::ChainedCall.new(@obfuscator, block)
- elsif type == :replace
- @obfuscator = block
- else
- fail "unknown sql_obfuscator type #{type}"
- end
- end
-
+ # Shorthand to the NewRelic::Agent.logger method
def log
NewRelic::Agent.logger
end
# Herein lies the corpse of the former 'start' method. May
# it's unmatched flog score rest in pieces.
module Start
+ # Check whether we have already started, which is an error condition
def already_started?
if started?
control.log!("Agent Started Already!", :error)
true
end
end
+ # The agent is disabled when it is not force enabled by the
+ # 'agent_enabled' option (e.g. in a manual start), or
+ # enabled normally through the configuration file
def disabled?
!control.agent_enabled?
end
+ # Logs the dispatcher to the log file to assist with
+ # debugging. When no debugger is present, logs this fact to
+ # assist with proper dispatcher detection
def log_dispatcher
dispatcher_name = control.dispatcher.to_s
return if log_if(dispatcher_name.empty?, :info, "No dispatcher detected.")
log.info "Dispatcher: #{dispatcher_name}"
end
+ # Logs the configured application names
def log_app_names
log.info "Application: #{control.app_names.join(", ")}"
end
-
- def apdex_f
- (4 * NewRelic::Control.instance.apdex_t).to_f
- end
-
- def apdex_f_threshold?
- sampler_config.fetch('transaction_threshold', '') =~ /apdex_f/i
- end
-
- def set_sql_recording!
- record_sql_config = sampler_config.fetch('record_sql', :obfuscated)
- case record_sql_config.to_s
- when 'off'
- @record_sql = :off
- when 'none'
- @record_sql = :off
- when 'false'
- @record_sql = :off
- when 'raw'
- @record_sql = :raw
- else
- @record_sql = :obfuscated
- end
-
- log_sql_transmission_warning?
- end
-
- def log_sql_transmission_warning?
- log_if((@record_sql == :raw), :warn, "Agent is configured to send raw SQL to the service")
- end
-
- def sampler_config
- control.fetch('transaction_tracer', {})
- end
-
- # this entire method should be done on the transaction
- # sampler object, rather than here. We should pass in the
- # sampler config.
- def config_transaction_tracer
- @should_send_samples = @config_should_send_samples = sampler_config.fetch('enabled', true)
- @should_send_random_samples = sampler_config.fetch('random_sample', false)
- @explain_threshold = sampler_config.fetch('explain_threshold', 0.5).to_f
- @explain_enabled = sampler_config.fetch('explain_enabled', true)
- set_sql_recording!
-
- # default to 2.0, string 'apdex_f' will turn into your
- # apdex * 4
- @slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0).to_f
- @slowest_transaction_threshold = apdex_f if apdex_f_threshold?
- end
-
+
+ # Connecting in the foreground blocks further startup of the
+ # agent until we have a connection - useful in cases where
+ # you're trying to log a very-short-running process and want
+ # to get statistics from before a server connection
+ # (typically 20 seconds) exists
def connect_in_foreground
NewRelic::Agent.disable_all_tracing { connect(:keep_retrying => false) }
end
- def using_rubinius?
- RUBY_VERSION =~ /rubinius/i
- end
-
- def using_jruby?
- defined?(JRuby)
- end
-
+ # If we're using sinatra, old versions run in an at_exit
+ # block so we should probably know that
def using_sinatra?
defined?(Sinatra::Application)
end
# we should not set an at_exit block if people are using
# these as they don't do standard at_exit behavior per MRI/YARV
def weird_ruby?
- using_rubinius? || using_jruby? || using_sinatra?
+ NewRelic::LanguageSupport.using_engine?('rbx') ||
+ NewRelic::LanguageSupport.using_engine?('jruby') ||
+ using_sinatra?
end
+ # Installs our exit handler, which exploits the weird
+ # behavior of at_exit blocks to make sure it runs last, by
+ # doing an at_exit within an at_exit block.
def install_exit_handler
if control.send_data_on_exit && !weird_ruby?
# Our shutdown handler needs to run after other shutdown handlers
at_exit { at_exit { shutdown } }
end
end
+ # Tells us in the log file where the log file is
+ # located. This seems redundant, but can come in handy when
+ # we have some log file path set by the user which parses
+ # incorrectly, sending the log file to who-knows-where
def notify_log_file_location
log_file = NewRelic::Control.instance.log_file
- log_if(log_file, :info, "Agent Log found in #{log_file}")
+ log_if(File.exists?(log_file.to_s), :info,
+ "Agent Log at #{log_file}")
end
+ # Classy logging of the agent version and the current pid,
+ # so we can disambiguate processes in the log file and make
+ # sure they're running a reasonable version
def log_version_and_pid
log.info "New Relic Ruby Agent #{NewRelic::VERSION::STRING} Initialized: pid = #{$$}"
end
+ # A helper method that logs a condition if that condition is
+ # true. Mentally cleaner than having every method set a
+ # local and log if it is true
def log_if(boolean, level, message)
self.log.send(level, message) if boolean
boolean
end
+ # A helper method that logs a condition unless that
+ # condition is true. Mentally cleaner than having every
+ # method set a local and log unless it is true
def log_unless(boolean, level, message)
self.log.send(level, message) unless boolean
boolean
end
+ # Warn the user if they have configured their agent not to
+ # send data, that way we can see this clearly in the log file
def monitoring?
log_unless(control.monitor_mode?, :warn, "Agent configured not to send data in this environment - edit newrelic.yml to change this")
end
+ # Tell the user when the license key is missing so they can
+ # fix it by adding it to the file
def has_license_key?
log_unless(control.license_key, :error, "No license key found. Please edit your newrelic.yml file and insert your license key.")
end
+ # A correct license key exists and is of the proper length
def has_correct_license_key?
has_license_key? && correct_license_length
end
+ # A license key is an arbitrary 40 character string,
+ # usually looks something like a SHA1 hash
def correct_license_length
key = control.license_key
log_unless((key.length == 40), :error, "Invalid license key: #{key}")
end
+ # If we're using a dispatcher that forks before serving
+ # requests, we need to wait until the children are forked
+ # before connecting, otherwise the parent process sends odd data
def using_forking_dispatcher?
- log_if([:passenger, :unicorn].include?(control.dispatcher), :info, "Connecting workers after forking.")
+ log_if([:passenger, :unicorn, :rainbows].include?(control.dispatcher), :info, "Connecting workers after forking.")
end
+ # Sanity-check the agent configuration and start the agent,
+ # setting up the worker thread and the exit handler to shut
+ # down the agent
def check_config_and_start_agent
return unless monitoring? && has_correct_license_key?
return if using_forking_dispatcher?
connect_in_foreground if control.sync_startup
start_worker_thread
@@ -397,10 +420,11 @@
end
end
include Start
+ # Logs a bunch of data and starts the agent, if needed
def start
return if already_started? || disabled?
@started = true
@local_host = determine_host
log_dispatcher
@@ -409,108 +433,138 @@
check_config_and_start_agent
log_version_and_pid
notify_log_file_location
end
- # Clear out the metric data, errors, and transaction traces. Reset the histogram data.
+ # Clear out the metric data, errors, and transaction traces,
+ # making sure the agent is in a fresh state
def reset_stats
@stats_engine.reset_stats
@unsent_errors = []
@traces = nil
@unsent_timeslice_data = {}
@last_harvest_time = Time.now
@launch_time = Time.now
- @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
end
private
- def collector
- @collector ||= control.server
- end
-
+
+ # All of this module used to be contained in the
+ # start_worker_thread method - this is an artifact of
+ # refactoring and can be moved, renamed, etc at will
module StartWorkerThread
+ # disable transaction sampling if disabled by the server
+ # and we're not in dev mode
def check_transaction_sampler_status
- # disable transaction sampling if disabled by the server
- # and we're not in dev mode
if control.developer_mode? || @should_send_samples
@transaction_sampler.enable
else
@transaction_sampler.disable
end
end
+
+ def check_sql_sampler_status
+ # disable sql sampling if disabled by the server
+ # and we're not in dev mode
+ if @sql_sampler.config.fetch('enabled', true) && ['raw', 'obfuscated'].include?(@sql_sampler.config.fetch('record_sql', 'obfuscated').to_s) && @transaction_sampler.config.fetch('enabled', true)
+ @sql_sampler.enable
+ else
+ @sql_sampler.disable
+ end
+ end
+ # logs info about the worker loop so users can see when the
+ # agent actually begins running in the background
def log_worker_loop_start
log.info "Reporting performance data every #{@report_period} seconds."
log.debug "Running worker loop"
end
+ # Creates the worker loop and loads it with the instructions
+ # it should run every @report_period seconds
def create_and_run_worker_loop
@worker_loop = WorkerLoop.new
@worker_loop.run(@report_period) do
- NewRelic::Agent.load_data
- harvest_and_send_errors
- harvest_and_send_slowest_sample
- harvest_and_send_timeslice_data
+ transmit_data
end
end
+ # Handles the case where the server tells us to restart -
+ # this clears the data, clears connection attempts, and
+ # waits a while to reconnect.
def handle_force_restart(error)
log.info error.message
- # disconnect and start over.
- # clear the stats engine
reset_stats
@metric_ids = {}
@connected = nil
- # Wait a short time before trying to reconnect
sleep 30
end
+ # when a disconnect is requested, stop the current thread, which
+ # is the worker thread that gathers data and talks to the
+ # server.
def handle_force_disconnect(error)
- # when a disconnect is requested, stop the current thread, which
- # is the worker thread that gathers data and talks to the
- # server.
log.error "New Relic forced this agent to disconnect (#{error.message})"
disconnect
end
+ # there is a problem with connecting to the server, so we
+ # stop trying to connect and shut down the agent
def handle_server_connection_problem(error)
log.error "Unable to establish connection with the server. Run with log level set to debug for more information."
log.debug("#{error.class.name}: #{error.message}\n#{error.backtrace.first}")
disconnect
end
+ # Handles an unknown error in the worker thread by logging
+ # it and disconnecting the agent, since we are now in an
+ # unknown state
def handle_other_error(error)
log.error "Terminating worker loop: #{error.class.name}: #{error.message}\n #{error.backtrace.join("\n ")}"
disconnect
end
+ # a wrapper method to handle all the errors that can happen
+ # in the connection and worker thread system. This
+ # guarantees a no-throw from the background thread.
def catch_errors
yield
rescue NewRelic::Agent::ForceRestartException => e
handle_force_restart(e)
retry
rescue NewRelic::Agent::ForceDisconnectException => e
handle_force_disconnect(e)
rescue NewRelic::Agent::ServerConnectionException => e
handle_server_connection_problem(e)
- rescue Exception => e
+ rescue => e
handle_other_error(e)
end
+ # This is the method that is run in a new thread in order to
+ # background the harvesting and sending of data during the
+ # normal operation of the agent.
+ #
+ # Takes connection options that determine how we should
+ # connect to the server, and loops endlessly - typically we
+ # never return from this method unless we're shutting down
+ # the agent
def deferred_work!(connection_options)
catch_errors do
NewRelic::Agent.disable_all_tracing do
# We try to connect. If this returns false that means
# the server rejected us for a licensing reason and we should
# just exit the thread. If it returns nil
# that means it didn't try to connect because we're in the master.
connect(connection_options)
if @connected
check_transaction_sampler_status
+ check_sql_sampler_status
log_worker_loop_start
create_and_run_worker_loop
+ # never reaches here unless there is a problem or
+ # the agent is exiting
else
log.debug "No connection. Worker thread ending."
end
end
end
@@ -527,40 +581,63 @@
deferred_work!(connection_options)
end # thread new
@worker_thread['newrelic_label'] = 'Worker Loop'
end
+ # A shorthand for NewRelic::Control.instance
def control
NewRelic::Control.instance
end
-
+
+ # This module is an artifact of a refactoring of the connect
+ # method - all of its methods are used in that context, so it
+ # can be refactored at will. It should be fully tested
module Connect
+ # the frequency with which we should try to connect to the
+ # server at the moment.
attr_accessor :connect_retry_period
+ # number of attempts we've made to contact the server
attr_accessor :connect_attempts
+ # Disconnect just sets connected to false, which prevents
+ # the agent from trying to connect again
def disconnect
@connected = false
true
end
+ # We've tried to connect if @connected is not nil, or if we
+ # are forcing reconnection (i.e. in the case of an
+ # after_fork with long running processes)
def tried_to_connect?(options)
!(@connected.nil? || options[:force_reconnect])
end
+ # We keep trying by default, but you can disable it with the
+ # :keep_retrying option set to false
def should_keep_retrying?(options)
@keep_retrying = (options[:keep_retrying].nil? || options[:keep_retrying])
end
+ # Retry period is a minute for each failed attempt that
+ # we've made. This should probably do some sort of sane TCP
+ # backoff to prevent hammering the server, but a minute for
+ # each attempt seems to work reasonably well.
def get_retry_period
return 600 if self.connect_attempts > 6
connect_attempts * 60
end
- def increment_retry_period!
+ def increment_retry_period! #:nodoc:
self.connect_retry_period=(get_retry_period)
end
+ # We should only retry when there has not been a more
+ # serious condition that would prevent it. We increment the
+ # connect attempts and the retry period, to prevent constant
+ # connection attempts, and tell the user what we're doing by
+ # logging.
def should_retry?
if @keep_retrying
self.connect_attempts=(connect_attempts + 1)
increment_retry_period!
log.info "Will re-attempt in #{connect_retry_period} seconds"
@@ -569,38 +646,59 @@
disconnect
false
end
end
+ # When we have a problem connecting to the server, we need
+ # to tell the user what happened, since this is not an error
+ # we can handle gracefully.
def log_error(error)
log.error "Error establishing connection with New Relic Service at #{control.server}: #{error.message}"
log.debug error.backtrace.join("\n")
end
+ # When the server sends us an error with the license key, we
+ # want to tell the user that something went wrong, and let
+ # them know where to go to get a valid license key
+ #
+ # After this runs, it disconnects the agent so that it will
+ # no longer try to connect to the server, saving the
+ # application and the server load
def handle_license_error(error)
log.error error.message
log.info "Visit NewRelic.com to obtain a valid license key, or to upgrade your account."
disconnect
end
+ # If we are using a seed and token to validate the agent, we
+ # should debug log that fact so that debug logs include a
+ # clue that token authentication is what will be used
def log_seed_token
if control.validate_seed
log.debug "Connecting with validation seed/token: #{control.validate_seed}/#{control.validate_token}"
end
end
+ # Checks whether we should send environment info, and if so,
+ # returns the snapshot from the local environment
def environment_for_connect
control['send_environment_info'] != false ? control.local_env.snapshot : []
end
+ # These validation settings are used for cases where a
+ # dynamic server is spun up for clients - partners can
+ # include a seed and token to indicate that the host is
+ # allowed to connect, rather than setting a unique hostname
def validate_settings
{
:seed => control.validate_seed,
:token => control.validate_token
}
end
+ # Initializes the hash of settings that we send to the
+ # server. Returns a literal hash containing the options
def connect_settings
{
:pid => $$,
:host => @local_host,
:app_name => control.app_names,
@@ -609,99 +707,216 @@
:environment => environment_for_connect,
:settings => control.settings,
:validate => validate_settings
}
end
+
+ # Does some simple logging to make sure that our seed and
+ # token for verification are correct, then returns the
+ # connect data passed back from the server
def connect_to_server
log_seed_token
- connect_data = invoke_remote(:connect, connect_settings)
+ @service.connect(connect_settings)
end
+ # Configures the error collector if the server says that we
+ # are allowed to send errors. Pretty simple, and logs at
+ # debug whether errors will or will not be sent.
def configure_error_collector!(server_enabled)
+ # Reinitialize the error collector
+ @error_collector = NewRelic::Agent::ErrorCollector.new
# Ask for permission to collect error data
enabled = if error_collector.config_enabled && server_enabled
error_collector.enabled = true
else
error_collector.enabled = false
end
log.debug "Errors will #{enabled ? '' : 'not '}be sent to the New Relic service."
end
+ # Random sampling is enabled based on a sample rate, which
+ # is the n in "every 1/n transactions is added regardless of
+ # its length".
+ #
+ # uses a sane default for sampling rate if the sampling rate
+ # is zero, since the collector currently sends '0' as a
+ # sampling rate for all accounts, which is probably for
+ # legacy reasons
def enable_random_samples!(sample_rate)
- sample_rate = 10 unless sample_rate.to_i > 0# a sane default for random sampling
+ sample_rate = 10 unless sample_rate.to_i > 0
@transaction_sampler.random_sampling = true
@transaction_sampler.sampling_rate = sample_rate
log.info "Transaction sampling enabled, rate = #{@transaction_sampler.sampling_rate}"
end
+ # this entire method should be done on the transaction
+ # sampler object, rather than here. We should pass in the
+ # sampler config.
+ def config_transaction_tracer
+ # Reconfigure the transaction tracer
+ @transaction_sampler.configure!
+ @sql_sampler.configure!
+ @should_send_samples = @config_should_send_samples = @transaction_sampler.config.fetch('enabled', true)
+ @should_send_random_samples = @transaction_sampler.config.fetch('random_sample', false)
+ set_sql_recording!
+ # default to 2.0, string 'apdex_f' will turn into your
+ # apdex * 4
+ @slowest_transaction_threshold = @transaction_sampler.config.fetch('transaction_threshold', 2.0).to_f
+ @slowest_transaction_threshold = apdex_f if apdex_f_threshold?
+ end
+
+ # Enables or disables the transaction tracer and sets its
+ # options based on the options provided to the
+ # method.
def configure_transaction_tracer!(server_enabled, sample_rate)
# Ask the server for permission to send transaction samples.
# determined by subscription license.
+ @transaction_sampler.config['enabled'] = server_enabled
+ @sql_sampler.configure!
@should_send_samples = @config_should_send_samples && server_enabled
-
+
if @should_send_samples
# I don't think this is ever true, but...
enable_random_samples!(sample_rate) if @should_send_random_samples
+
+ @transaction_sampler.slow_capture_threshold = @slowest_transaction_threshold
+
log.debug "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds."
else
log.debug "Transaction traces will not be sent to the New Relic service."
end
end
- def set_collector_host!
- host = invoke_remote(:get_redirect_host)
- if host
- @collector = control.server_from_host(host)
+ # apdex_f is always 4 times the apdex_t
+ def apdex_f
+ (4 * NewRelic::Control.instance.apdex_t).to_f
+ end
+
+ # If the transaction threshold is set to the string
+ # 'apdex_f', we use 4 times the apdex_t value to record
+ # transactions. This gears well with using apdex since you
+ # will attempt to send any transactions that register as 'failing'
+ def apdex_f_threshold?
+ @transaction_sampler.config.fetch('transaction_threshold', '') =~ /apdex_f/i
+ end
+
+ # Sets the sql recording configuration by trying to detect
+ # any attempt to disable the sql collection - 'off',
+ # 'false', 'none', and friends. Otherwise, we accept 'raw',
+ # and unrecognized values default to 'obfuscated'
+ def set_sql_recording!
+ record_sql_config = @transaction_sampler.config.fetch('record_sql', :obfuscated)
+ case record_sql_config.to_s
+ when 'off'
+ @record_sql = :off
+ when 'none'
+ @record_sql = :off
+ when 'false'
+ @record_sql = :off
+ when 'raw'
+ @record_sql = :raw
+ else
+ @record_sql = :obfuscated
end
+
+ log_sql_transmission_warning?
end
- def query_server_for_configuration
- set_collector_host!
+ # Warn the user when we are sending raw sql across the wire
+ # - they should probably be using ssl when this is true
+ def log_sql_transmission_warning?
+ log.warn("Agent is configured to send raw SQL to the service") if @record_sql == :raw
+ end
+ # Asks the collector to tell us which sub-collector we
+ # should be reporting to, and then does the name resolution
+ # on that host so we don't block on DNS during the normal
+ # course of agent processing
+# def set_collector_host!
+# host = invoke_remote(:get_redirect_host)
+# if host
+# @collector = control.server_from_host(host)
+# end
+# end
+
+ # Sets the collector host and connects to the server, then
+ # invokes the final configuration with the returned data
+ def query_server_for_configuration
finish_setup(connect_to_server)
end
+
+ # Takes a hash of configuration data returned from the
+ # server and uses it to set local variables and to
+ # initialize various parts of the agent that are configured
+ # separately.
+ #
+ # Can accommodate most arbitrary data - anything extra is
+ # ignored unless we say to do something with it here.
def finish_setup(config_data)
- @agent_id = config_data['agent_run_id']
+ return if config_data == nil
+ @service.agent_id = config_data['agent_run_id']
@report_period = config_data['data_report_period']
@url_rules = config_data['url_rules']
@beacon_configuration = BeaconConfiguration.new(config_data)
+ @server_side_config_enabled = config_data['listen_to_server_config']
+ if @server_side_config_enabled
+ log.info "Using config from server"
+ log.debug "Server provided config: #{config_data.inspect}"
+ end
+
+ control.merge_server_side_config(config_data) if @server_side_config_enabled
+ config_transaction_tracer
log_connection!(config_data)
configure_transaction_tracer!(config_data['collect_traces'], config_data['sample_rate'])
configure_error_collector!(config_data['collect_errors'])
end
-
+
+ # Logs when we connect to the server, for debugging purposes
+ # - makes sure we know if an agent has not connected
def log_connection!(config_data)
- control.log! "Connected to NewRelic Service at #{@collector}"
- log.debug "Agent Run = #{@agent_id}."
+ control.log! "Connected to NewRelic Service at #{@service.collector.name}"
+ log.debug "Agent Run = #{@service.agent_id}."
log.debug "Connection data = #{config_data.inspect}"
end
end
include Connect
+
+ # Serialize all the important data that the agent might want
+ # to send to the server. We could be sending this to file (
+ # common in short-running background transactions ) or
+ # alternately we could serialize via a pipe or socket to a
+ # local aggregation device
def serialize
accumulator = []
accumulator[1] = harvest_transaction_traces if @transaction_sampler
accumulator[2] = harvest_errors if @error_collector
accumulator[0] = harvest_timeslice_data
+ reset_stats
+ @metric_ids = {}
accumulator
end
-
public :serialize
+ # Accepts data as provided by the serialize method and merges
+ # it into our current collection of data to send. Can be
+ # dangerous if we re-merge the same data more than once - it
+ # will be sent multiple times.
def merge_data_from(data)
metrics, transaction_traces, errors = data
@stats_engine.merge_data(metrics) if metrics
- if transaction_traces
+ if transaction_traces && transaction_traces.respond_to?(:any?) &&
+ transaction_traces.any?
if @traces
- @traces = @traces + transaction_traces
+ @traces += transaction_traces
else
@traces = transaction_traces
end
end
- if errors
+ if errors && errors.respond_to?(:any?) && errors.any?
if @unsent_errors
@unsent_errors = @unsent_errors + errors
else
@unsent_errors = errors
end
@@ -749,121 +964,144 @@
else
disconnect
end
end
+ # Who am I? Well, this method can tell you your hostname.
def determine_host
Socket.gethostname
end
+ # Delegates to the control class to determine the root
+ # directory of this project
def determine_home_directory
control.root
end
+ # Checks whether this process is a Passenger or Unicorn or Rainbows
+ # spawning server - if so, we probably don't intend to report
+ # statistics from this process
def is_application_spawner?
- $0 =~ /ApplicationSpawner|^unicorn\S* master/
+ $0 =~ /ApplicationSpawner|^unicorn\S* master|^rainbows master/
end
+ # calls the busy harvester and collects timeslice data to
+ # send later
def harvest_timeslice_data(time=Time.now)
# this creates timeslices that are harvested below
NewRelic::Agent::BusyCalculator.harvest_busy
@unsent_timeslice_data ||= {}
@unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids)
@unsent_timeslice_data
end
+ # takes an array of arrays of spec and id, adds it into the
+ # metric cache so we can save the collector some work by
+ # sending integers instead of strings
def fill_metric_id_cache(pairs_of_specs_and_ids)
Array(pairs_of_specs_and_ids).each do |metric_spec, metric_id|
@metric_ids[metric_spec] = metric_id
end
end
+ # note - exceptions are logged in invoke_remote. If an exception is encountered here,
+ # then the metric data is downsampled for another
+ # transmission later
def harvest_and_send_timeslice_data
now = Time.now
NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote').record_data_point(0.0)
NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote/metric_data').record_data_point(0.0)
harvest_timeslice_data(now)
- begin
- # In this version of the protocol, we get back an assoc array of spec to id.
- metric_specs_and_ids = invoke_remote(:metric_data, @agent_id,
- @last_harvest_time.to_f,
- now.to_f,
- @unsent_timeslice_data.values)
-
- rescue Timeout::Error
- # assume that the data was received. chances are that it was
- metric_specs_and_ids = []
- end
-
+ # In this version of the protocol, we get back an assoc array of spec to id.
+ metric_specs_and_ids = @service.metric_data(@last_harvest_time.to_f,
+ now.to_f,
+ @unsent_timeslice_data.values)
+ metric_specs_and_ids ||= []
fill_metric_id_cache(metric_specs_and_ids)
- log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
+ log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@service.agent_id}) in #{Time.now - now} seconds"
# if we successfully invoked this web service, then clear the unsent message cache.
@unsent_timeslice_data = {}
@last_harvest_time = now
-
- # handle_messages
-
- # note - exceptions are logged in invoke_remote. If an exception is encountered here,
- # then the metric data is downsampled for another timeslices
end
+ # Fills the traces array with the harvested transactions from
+ # the transaction sampler, subject to the setting for slowest
+ # transaction threshold
def harvest_transaction_traces
@traces = @transaction_sampler.harvest(@traces, @slowest_transaction_threshold)
@traces
end
+ def harvest_and_send_slowest_sql
+ # FIXME add the code to try to resend if our connection is down
+ sql_traces = @sql_sampler.harvest
+ unless sql_traces.empty?
+ log.debug "Sending (#{sql_traces.size}) sql traces"
+ begin
+ @service.sql_trace_data(sql_traces)
+ rescue
+ @sql_sampler.merge sql_traces
+ end
+ end
+ end
+
+ # This handles getting the transaction traces and then sending
+ # them across the wire. This includes gathering SQL
+ # explanations, stripping out stack traces, and normalizing
+ # SQL. note that we explain only the sql statements whose
+ # segments' execution times exceed our threshold (to avoid
+ # unnecessary overhead of running explains on fast queries.)
def harvest_and_send_slowest_sample
harvest_transaction_traces
unless @traces.empty?
now = Time.now
log.debug "Sending (#{@traces.length}) transaction traces"
+
begin
- # take the traces and prepare them for sending across the
- # wire. This includes gathering SQL explanations, stripping
- # out stack traces, and normalizing SQL. note that we
- # explain only the sql statements whose segments' execution
- # times exceed our threshold (to avoid unnecessary overhead
- # of running explains on fast queries.)
options = { :keep_backtraces => true }
options[:record_sql] = @record_sql unless @record_sql == :off
- options[:explain_sql] = @explain_threshold if @explain_enabled
+ if @transaction_sampler.explain_enabled
+ options[:explain_sql] = @transaction_sampler.explain_threshold
+ end
traces = @traces.collect {|trace| trace.prepare_to_send(options)}
- invoke_remote :transaction_sample_data, @agent_id, traces
+ @service.transaction_sample_data(traces)
rescue PostTooBigException
# we tried to send too much data, drop the first trace and
# try again
retry if @traces.shift
end
- log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds"
+ log.debug "Sent slowest sample (#{@service.agent_id}) in #{Time.now - now} seconds"
end
# if we succeed sending this sample, then we don't need to keep
# the slowest sample around - it has been sent already and we
- # can collect the next one
+ # can clear the collection and move on
@traces = nil
-
- # note - exceptions are logged in invoke_remote. If an
- # exception is encountered here, then the slowest sample of is
- # determined of the entire period since the last reported
- # sample.
end
+ # Gets the collection of unsent errors from the error
+ # collector. We pass back in an existing array of errors that
+ # may be left over from a previous send
def harvest_errors
@unsent_errors = @error_collector.harvest_errors(@unsent_errors)
@unsent_errors
end
+ # Handles getting the errors from the error collector and
+ # sending them to the server, and any error cases like trying
+ # to send very large errors - we drop the oldest error on the
+ # floor and try again
def harvest_and_send_errors
harvest_errors
if @unsent_errors && @unsent_errors.length > 0
log.debug "Sending #{@unsent_errors.length} errors"
begin
- invoke_remote :error_data, @agent_id, @unsent_errors
+ @service.error_data(@unsent_errors)
rescue PostTooBigException
@unsent_errors.shift
retry
end
# if the remote invocation fails, then we never clear
@@ -871,168 +1109,52 @@
# the next heartbeat. Note the error collector maxes out at
# 20 instances to prevent leakage
@unsent_errors = []
end
end
-
- def compress_data(object)
- dump = Marshal.dump(object)
-
- # this checks to make sure mongrel won't choke on big uploads
- check_post_size(dump)
-
- # we currently optimize for CPU here since we get roughly a 10x
- # reduction in message size with this, and CPU overhead is at a
- # premium. For extra-large posts, we use the higher compression
- # since otherwise it actually errors out.
-
- dump_size = dump.size
-
- # Compress if content is smaller than 64kb. There are problems
- # with bugs in Ruby in some versions that expose us to a risk of
- # segfaults if we compress aggressively.
- return [dump, 'identity'] if dump_size < (64*1024)
-
- # medium payloads get fast compression, to save CPU
- # big payloads get all the compression possible, to stay under
- # the 2,000,000 byte post threshold
- compression = dump_size < 2000000 ? Zlib::BEST_SPEED : Zlib::BEST_COMPRESSION
-
- [Zlib::Deflate.deflate(dump, compression), 'deflate']
- end
-
- def check_post_size(post_string)
- # TODO: define this as a config option on the server side
- return if post_string.size < control.post_size_limit
- log.warn "Tried to send too much data: #{post_string.size} bytes"
- raise PostTooBigException
- end
-
- def send_request(opts)
- request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name)
- request['user-agent'] = user_agent
- request.content_type = "application/octet-stream"
- request.body = opts[:data]
-
- log.debug "Connect to #{opts[:collector]}#{opts[:uri]}"
-
- response = nil
- http = control.http_connection(collector)
- http.read_timeout = nil
- begin
- NewRelic::TimerLib.timeout(@request_timeout) do
- response = http.request(request)
- end
- rescue Timeout::Error
- log.warn "Timed out trying to post data to New Relic (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30
- raise
- end
- if response.is_a? Net::HTTPServiceUnavailable
- raise NewRelic::Agent::ServerConnectionException, "Service unavailable (#{response.code}): #{response.message}"
- elsif response.is_a? Net::HTTPGatewayTimeOut
- log.debug("Timed out getting response: #{response.message}")
- raise Timeout::Error, response.message
- elsif response.is_a? Net::HTTPRequestEntityTooLarge
- raise PostTooBigException
- elsif !(response.is_a? Net::HTTPSuccess)
- raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server (#{response.code}): #{response.message}"
- end
- response
- end
-
- def decompress_response(response)
- if response['content-encoding'] != 'gzip'
- log.debug "Uncompressed content returned"
- return response.body
- end
- log.debug "Decompressing return value"
- i = Zlib::GzipReader.new(StringIO.new(response.body))
- i.read
- end
-
- def check_for_exception(response)
- dump = decompress_response(response)
- value = Marshal.load(dump)
- raise value if value.is_a? Exception
- value
- end
-
- def remote_method_uri(method)
- uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}"
- uri << "?run_id=#{@agent_id}" if @agent_id
- uri
- end
-
- def user_agent
- ruby_description = ''
- # note the trailing space!
- ruby_description << "(ruby #{::RUBY_VERSION} #{::RUBY_PLATFORM}) " if defined?(::RUBY_VERSION) && defined?(::RUBY_PLATFORM)
- zlib_version = ''
- zlib_version << "zlib/#{Zlib.zlib_version}" if defined?(::Zlib) && Zlib.respond_to?(:zlib_version)
- "NewRelic-RubyAgent/#{NewRelic::VERSION::STRING} #{ruby_description}#{zlib_version}"
- end
-
- # send a message via post
- def invoke_remote(method, *args)
- now = Time.now
- #determines whether to zip the data or send plain
- post_data, encoding = compress_data(args)
-
- response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data})
-
- # raises the right exception if the remote server tells it to die
- return check_for_exception(response)
- rescue NewRelic::Agent::ForceRestartException => e
- log.info e.message
- raise
- rescue SystemCallError, SocketError => e
- # These include Errno connection errors
- raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}"
+
+ def transmit_data
+ log.debug "Sending data to New Relic Service"
+ harvest_and_send_errors
+ harvest_and_send_slowest_sample
+ harvest_and_send_slowest_sql
+ harvest_and_send_timeslice_data
+ rescue => e
+ retry_count ||= 0
+ retry_count += 1
+ retry unless retry_count > 1
+ raise e
ensure
- NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote').record_data_point((Time.now - now).to_f)
- NewRelic::Agent.instance.stats_engine.get_stats_no_scope('Supportability/invoke_remote/' + method.to_s).record_data_point((Time.now - now).to_f)
+ NewRelic::Agent::Database.close_connections unless forked?
end
+ # This method contacts the server to send remaining data and
+ # let the server know that the agent is shutting down - this
+ # allows us to do things like accurately set the end of the
+ # lifetime of the process
+ #
+ # If this process comes from a parent process, it will not
+ # disconnect, so that the parent process can continue to send data
def graceful_disconnect
if @connected
begin
- @request_timeout = 10
- if NewRelic::DataSerialization.should_send_data?
- log.debug "Sending data to New Relic Service"
- NewRelic::Agent.load_data
- harvest_and_send_errors
- harvest_and_send_slowest_sample
- harvest_and_send_timeslice_data
- else
- log.debug "Serializing agent data to disk"
- NewRelic::Agent.save_data
- end
- if @connected_pid == $$
+ @service.request_timeout = 10
+ transmit_data
+ if @connected_pid == $$ && !@service.kind_of?(NewRelic::Agent::NewRelicService)
log.debug "Sending New Relic service agent run shutdown message"
- invoke_remote :shutdown, @agent_id, Time.now.to_f
+ @service.shutdown(Time.now.to_f)
else
log.debug "This agent connected from parent process #{@connected_pid}--not sending shutdown"
end
log.debug "Graceful disconnect complete"
rescue Timeout::Error, StandardError
end
else
log.debug "Bypassing graceful disconnect - agent not connected"
end
end
- def default_sql_obfuscator(sql)
- sql = sql.dup
- # This is hardly readable. Use the unit tests.
- # remove single quoted strings:
- sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?')
- # remove double quoted strings:
- sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?')
- # replace all number literals
- sql.gsub!(/\d+/, "?")
- sql
- end
end
-
+
extend ClassMethods
include InstanceMethods
include BrowserMonitoring
end
end