lib/new_relic/agent/agent.rb in newrelic_rpm-2.9.9 vs lib/new_relic/agent/agent.rb in newrelic_rpm-2.10.3
- old
+ new
@@ -6,27 +6,31 @@
require 'stringio'
# The NewRelic Agent collects performance data from ruby applications
# in realtime as the application runs, and periodically sends that
# data to the NewRelic server.
-module NewRelic::Agent
+module NewRelic
+ module Agent
# The Agent is a singleton that is instantiated when the plugin is
# activated.
class Agent
# Specifies the version of the agent's communication protocol with
# the NewRelic hosted site.
- PROTOCOL_VERSION = 5
+ PROTOCOL_VERSION = 8
attr_reader :obfuscator
attr_reader :stats_engine
attr_reader :transaction_sampler
attr_reader :error_collector
- attr_reader :worker_loop
+ attr_reader :task_loop
attr_reader :record_sql
+ attr_reader :histogram
+ attr_reader :metric_ids
+ attr_reader :should_send_errors
# Should only be called by NewRelic::Control
def self.instance
@instance ||= self.new
end
@@ -39,32 +43,36 @@
# for passenger where processes are forked and the agent is
# dormant
#
def ensure_worker_thread_started
return unless control.agent_enabled? && control.monitor_mode? && !@invalid_license
- if !running?
+ if !running?
+ # We got some reports of threading errors in Unicorn with this.
+ log.debug "Detected that the worker loop is not running. Restarting." rescue nil
+ # Assume we've been forked, clear out stats that are left over from parent process
+ reset_stats
launch_worker_thread
@stats_engine.spawn_sampler_thread
end
end
# True if the worker thread has been started. Doesn't necessarily
# mean we are connected
def running?
- control.agent_enabled? && control.monitor_mode? && @worker_loop && @worker_loop.pid == $$
+ control.agent_enabled? && control.monitor_mode? && @task_loop && @task_loop.pid == $$
end
# True if we have initialized and completed 'start'
def started?
@started
end
# Attempt a graceful shutdown of the agent.
def shutdown
return if not started?
- if @worker_loop
- @worker_loop.stop
+ if @task_loop
+ @task_loop.stop
log.debug "Starting Agent shutdown"
# if litespeed, then ignore all future SIGUSR1 - it's
# litespeed trying to shut us down
@@ -83,41 +91,38 @@
end
@started = nil
end
def start_transaction
- Thread::current[:custom_params] = nil
@stats_engine.start_transaction
end
def end_transaction
- Thread::current[:custom_params] = nil
@stats_engine.end_transaction
end
def set_record_sql(should_record)
prev = Thread::current[:record_sql]
Thread::current[:record_sql] = should_record
-
- prev || true
+ prev.nil? || prev
end
def set_record_tt(should_record)
prev = Thread::current[:record_tt]
Thread::current[:record_tt] = should_record
-
- prev || true
+ prev.nil? || prev
end
-
- def add_custom_parameters(params)
- p = Thread::current[:custom_params] || (Thread::current[:custom_params] = {})
-
- p.merge!(params)
+ # Push flag indicating whether we should be tracing in this
+ # thread.
+ def push_trace_execution_flag(should_trace=false)
+ (Thread.current[:newrelic_untraced] ||= []) << should_trace
end
-
- def custom_params
- Thread::current[:custom_params] || {}
+
+ # Pop the current trace execution status. Restore trace execution status
+ # to what it was before we pushed the current flag.
+ def pop_trace_execution_flag
+ Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced]
end
def set_sql_obfuscator(type, &block)
if type == :before
@obfuscator = NewRelic::ChainedCall.new(block, @obfuscator)
@@ -145,15 +150,16 @@
end
return if !control.agent_enabled?
@local_host = determine_host
- log.info "Web container: #{control.dispatcher.to_s}"
-
- if control.dispatcher == :passenger
- log.warn "Phusion Passenger has been detected. Some RPM memory statistics may have inaccuracies due to short process lifespans."
+ if control.dispatcher.nil? || control.dispatcher.to_s.empty?
+ log.info "No dispatcher detected."
+ else
+ log.info "Dispatcher: #{control.dispatcher.to_s}"
end
+ log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty?
@started = true
sampler_config = control.fetch('transaction_tracer', {})
@use_transaction_sampler = sampler_config.fetch('enabled', true)
@@ -163,11 +169,11 @@
# use transaction_threshold: 4.0 to force the TT collection
# threshold to 4 seconds
# use transaction_threshold: apdex_f to use your apdex t value
# multiplied by 4
# undefined transaction_threshold defaults to 2.0
- apdex_f = 4 * NewRelic::Control.instance['apdex_t'].to_f
+ apdex_f = 4 * NewRelic::Control.instance.apdex_t
@slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0)
if @slowest_transaction_threshold =~ /apdex_f/i
@slowest_transaction_threshold = apdex_f
end
@slowest_transaction_threshold = @slowest_transaction_threshold.to_f
@@ -183,12 +189,10 @@
log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw
# Initialize transaction sampler
@transaction_sampler.random_sampling = @random_sample
if control.monitor_mode?
- # make sure the license key exists and is likely to be really a license key
- # by checking it's string length (license keys are 40 character strings.)
if !control.license_key
@invalid_license = true
control.log! "No license key found. Please edit your newrelic.yml file and insert your license key.", :error
elsif control.license_key.length != 40
@invalid_license = true
@@ -196,11 +200,11 @@
else
launch_worker_thread
# When the VM shuts down, attempt to send a message to the
# server that this agent run is stopping, assuming it has
# successfully connected
- # This shutdown handler doesn't work if Sinatra or Unicorn is running
+ # This shutdown handler doesn't work if Sinatra is running
# because it executes in the shutdown handler!
at_exit { shutdown } unless [:sinatra, :unicorn].include? NewRelic::Control.instance.dispatcher
end
end
control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #{$$}"
@@ -212,88 +216,74 @@
@collector ||= control.server
end
# Connect to the server, and run the worker loop forever.
# Will not return.
- def run_worker_loop
-
- # connect to the server. this will keep retrying until
- # successful or it determines the license is bad.
- connect
+ def run_task_loop
+ # determine the reporting period (server based)
+ # note if the agent attempts to report more frequently than
+ # the specified report data, then it will be ignored.
- # We may not be connected now but keep going for dev mode
- if @connected
- begin
- # determine the reporting period (server based)
- # note if the agent attempts to report more frequently than
- # the specified report data, then it will be ignored.
-
- control.log! "Reporting performance data every #{@report_period} seconds."
- @worker_loop.add_task(@report_period) do
- harvest_and_send_timeslice_data
- end
-
- if @should_send_samples && @use_transaction_sampler
- @worker_loop.add_task(@report_period) do
- harvest_and_send_slowest_sample
- end
- elsif !control.developer_mode?
- # We still need the sampler for dev mode.
- @transaction_sampler.disable
- end
-
- if @should_send_errors && @error_collector.enabled
- @worker_loop.add_task(@report_period) do
- harvest_and_send_errors
- end
- end
- @worker_loop.run
- rescue StandardError
- @connected = false
- raise
+ control.log! "Reporting performance data every #{@report_period} seconds."
+ @task_loop.add_task(@report_period, "Timeslice Data Send") do
+ harvest_and_send_timeslice_data
+ end
+
+ if @should_send_samples && @use_transaction_sampler
+ @task_loop.add_task(@report_period, "Transaction Sampler Send") do
+ harvest_and_send_slowest_sample
end
+ elsif !control.developer_mode?
+ # We still need the sampler for dev mode.
+ @transaction_sampler.disable
end
+
+ if @should_send_errors && @error_collector.enabled
+ @task_loop.add_task(@report_period, "Error Send") do
+ harvest_and_send_errors
+ end
+ end
+ log.debug "Running worker loop"
+ @task_loop.run
+ rescue StandardError
+ log.debug "Error in worker loop: #{$!}"
+ @connected = false
+ raise
end
def launch_worker_thread
if (control.dispatcher == :passenger && $0 =~ /ApplicationSpawner/)
log.debug "Process is passenger spawner - don't connect to RPM service"
return
end
- @worker_loop = WorkerLoop.new(log)
+ @task_loop = WorkerLoop.new(log)
- if control['check_bg_loading']
- log.warn "Agent background loading checking turned on"
- require 'new_relic/agent/patch_const_missing'
- ClassLoadingWatcher.enable_warning
- end
-
+ log.debug "Creating RPM worker thread."
@worker_thread = Thread.new do
begin
- ClassLoadingWatcher.background_thread=Thread.current if control['check_bg_loading']
-
- run_worker_loop
+ NewRelic::Agent.disable_all_tracing do
+ connect
+ run_task_loop if @connected
+ end
+ rescue NewRelic::Agent::ForceRestartException => e
+ log.info e.message
+ # disconnect and start over.
+ # clear the stats engine
+ reset_stats
+ @connected = false
+ # Wait a short time before trying to reconnect
+ sleep 30
+ retry
rescue IgnoreSilentlyException
control.log! "Unable to establish connection with the server. Run with log level set to debug for more information."
- rescue StandardError => e
- control.log! e, :error
- control.log! e.backtrace.join("\n "), :error
+ rescue Exception => e
+ @connected = false
+ log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}"
end
end
@worker_thread['newrelic_label'] = 'Worker Loop'
-
- # This code should be activated to check that no dependency
- # loading is occuring in the background thread by stopping the
- # foreground thread after the background thread is created. Turn
- # on dependency loading logging and make sure that no loading
- # occurs.
- #
- # control.log! "FINISHED AGENT INIT"
- # while true
- # sleep 1
- # end
end
def control
NewRelic::Control.instance
end
@@ -301,20 +291,22 @@
def initialize
@connected = false
@launch_time = Time.now
@metric_ids = {}
-
+ @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
@stats_engine = NewRelic::Agent::StatsEngine.new
- @transaction_sampler = NewRelic::Agent::TransactionSampler.new(self)
- @error_collector = NewRelic::Agent::ErrorCollector.new(self)
+ @transaction_sampler = NewRelic::Agent::TransactionSampler.new
+ @stats_engine.transaction_sampler = @transaction_sampler
+ @error_collector = NewRelic::Agent::ErrorCollector.new
@request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60)
@invalid_license = false
@last_harvest_time = Time.now
+ @obfuscator = method(:default_sql_obfuscator)
end
# Connect to the server and validate the license. If successful,
# @connected has true when finished. If not successful, you can
# keep calling this. Return false if we could not establish a
@@ -325,20 +317,21 @@
connect_retry_period = 5
connect_attempts = 0
@agent_id = nil
begin
sleep connect_retry_period.to_i
+ environment = control['send_environment_info'] != false ? control.local_env.snapshot : []
@agent_id ||= invoke_remote :start, @local_host, {
:pid => $$,
:launch_time => @launch_time.to_f,
:agent_version => NewRelic::VERSION::STRING,
- :environment => control.local_env.snapshot,
+ :environment => environment,
:settings => control.settings,
:validate_seed => ENV['NR_VALIDATE_SEED'],
:validate_token => ENV['NR_VALIDATE_TOKEN'] }
- host = invoke_remote(:get_redirect_host) rescue nil
+ host = invoke_remote(:get_redirect_host)
@collector = control.server_from_host(host) if host
@report_period = invoke_remote :get_data_report_period, @agent_id
@@ -350,12 +343,11 @@
@should_send_samples = invoke_remote :should_collect_samples, @agent_id
if @should_send_samples
sampling_rate = invoke_remote :sampling_rate, @agent_id if @random_sample
@transaction_sampler.sampling_rate = sampling_rate
-
- log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}"
+ log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}" if sampling_rate
end
# Ask for permission to collect error data
@should_send_errors = invoke_remote :should_collect_errors, @agent_id
@@ -396,40 +388,45 @@
end
def determine_home_directory
control.root
end
+ def reset_stats
+ @stats_engine.reset_stats
+ @metric_ids = {}
+ @unsent_errors = []
+ @traces = nil
+ @unsent_timeslice_data = {}
+ @last_harvest_time = Time.now
+ @launch_time = Time.now
+ @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
+ end
def harvest_and_send_timeslice_data
- NewRelic::Agent::Instrumentation::DispatcherInstrumentation::BusyCalculator.harvest_busy
+ NewRelic::Agent::BusyCalculator.harvest_busy
now = Time.now
-
- # Fixme: remove the harvest thread tracking
- @harvest_thread ||= Thread.current
-
- if @harvest_thread != Thread.current
- log.error "Two harvest threads are running (current=#{Thread.current}, harvest=#{@harvest_thread}"
- @harvest_thread = Thread.current
- end
-
+
@unsent_timeslice_data ||= {}
@unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids)
begin
+ # In this version of the protocol, we get back an assoc array of spec to id.
metric_ids = invoke_remote(:metric_data, @agent_id,
@last_harvest_time.to_f,
now.to_f,
@unsent_timeslice_data.values)
rescue Timeout::Error
# assume that the data was received. chances are that it was
metric_ids = nil
end
- @metric_ids.merge! metric_ids if metric_ids
+ metric_ids.each do | spec, id |
+ @metric_ids[spec] = id
+ end if metric_ids
log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
# if we successfully invoked this web service, then clear the unsent message cache.
@unsent_timeslice_data = {}
@@ -453,21 +450,18 @@
# out stack traces, and normalizing SQL. note that we
# explain only the sql statements whose segments' execution
# times exceed our threshold (to avoid unnecessary overhead
# of running explains on fast queries.)
traces = @traces.collect {|trace| trace.prepare_to_send(:explain_sql => @explain_threshold, :record_sql => @record_sql, :keep_backtraces => true, :explain_enabled => @explain_enabled)}
-
-
invoke_remote :transaction_sample_data, @agent_id, traces
rescue PostTooBigException
# we tried to send too much data, drop the first trace and
# try again
- @traces.shift
- retry
+ retry if @traces.shift
end
- log.debug "#{now}: sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds"
+ log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds"
end
# if we succeed sending this sample, then we don't need to keep
# the slowest sample around - it has been sent already and we
# can collect the next one
@@ -498,10 +492,13 @@
end
def compress_data(object)
dump = Marshal.dump(object)
+ # this checks to make sure mongrel won't choke on big uploads
+ check_post_size(dump)
+
# we currently optimize for CPU here since we get roughly a 10x
# reduction in message size with this, and CPU overhead is at a
# premium. For extra-large posts, we use the higher compression
# since otherwise it actually errors out.
@@ -518,34 +515,39 @@
[Zlib::Deflate.deflate(dump, compression), 'deflate']
end
def check_post_size(post_string)
# TODO: define this as a config option on the server side
- return if post_string.size < 2000000
- log.warn "Tried to send too much data, retrying with less: #{post_string.size} bytes"
+ return if post_string.size < control.post_size_limit
+ log.warn "Tried to send too much data: #{post_string.size} bytes"
raise PostTooBigException
end
def send_request(opts)
request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'ACCEPT-ENCODING' => 'gzip', 'HOST' => opts[:collector].name)
request.content_type = "application/octet-stream"
request.body = opts[:data]
- log.debug "connect to #{opts[:collector]}#{opts[:uri]}"
+ log.debug "Connect to #{opts[:collector]}#{opts[:uri]}"
response = nil
http = control.http_connection(collector)
begin
timeout(@request_timeout) do
response = http.request(request)
end
rescue Timeout::Error
log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30
- raise IgnoreSilentlyException
+ raise
end
-
- if !(response.is_a? Net::HTTPSuccess)
+ if response.is_a? Net::HTTPServiceUnavailable
+ log.debug(response.body || response.message)
+ raise IgnoreSilentlyException
+ elsif response.is_a? Net::HTTPGatewayTimeOut
+ log.debug("Timed out getting response: #{response.message}")
+ raise Timeout::Error, response.message
+ elsif !(response.is_a? Net::HTTPSuccess)
log.debug "Unexpected response from server: #{response.code}: #{response.message}"
raise IgnoreSilentlyException
end
response
end
@@ -576,17 +578,17 @@
# send a message via post
def invoke_remote(method, *args)
#determines whether to zip the data or send plain
post_data, encoding = compress_data(args)
- # this checks to make sure mongrel won't choke on big uploads
- check_post_size(post_data)
-
response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data})
# raises the right exception if the remote server tells it to die
return check_for_exception(response)
+ rescue ForceRestartException => e
+ log.info e.message
+ raise
rescue ForceDisconnectException => e
log.error "RPM forced this agent to disconnect (#{e.message})\n" \
"Restart this process to resume monitoring via rpm.newrelic.com."
# when a disconnect is requested, stop the current thread, which
# is the worker thread that gathers data and talks to the
@@ -598,25 +600,40 @@
log.debug "Recoverable error connecting to the server: #{e}"
raise IgnoreSilentlyException
end
def graceful_disconnect
- if @connected && !(control.server.name == "localhost" && control.dispatcher_instance_id == '3000')
+ if @connected
begin
log.debug "Sending graceful shutdown message to #{control.server}"
- @request_timeout = 10
+ @request_timeout = 5
log.debug "Sending RPM service agent run shutdown message"
+ harvest_and_send_timeslice_data
+# harvest_and_send_slowest_sample
+ harvest_and_send_errors
invoke_remote :shutdown, @agent_id, Time.now.to_f
log.debug "Graceful shutdown complete"
rescue Timeout::Error, StandardError
end
else
log.debug "Bypassing graceful shutdown - agent not connected"
end
end
+ def default_sql_obfuscator(sql)
+ sql = sql.dup
+ # This is hardly readable. Use the unit tests.
+ # remove single quoted strings:
+ sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?')
+ # remove double quoted strings:
+ sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?')
+ # replace all number literals
+ sql.gsub!(/\d+/, "?")
+ sql
+ end
end
+end
end