lib/new_relic/agent/agent.rb in newrelic_rpm-2.10.6 vs lib/new_relic/agent/agent.rb in newrelic_rpm-2.10.8
- old
+ new
@@ -22,47 +22,49 @@
attr_reader :obfuscator
attr_reader :stats_engine
attr_reader :transaction_sampler
attr_reader :error_collector
- attr_reader :task_loop
attr_reader :record_sql
attr_reader :histogram
attr_reader :metric_ids
- attr_reader :should_send_errors
# Should only be called by NewRelic::Control
def self.instance
@instance ||= self.new
end
# This method is deprecated. Use NewRelic::Agent.manual_start
def manual_start(ignored=nil, also_ignored=nil)
raise "This method no longer supported. Instead use the class method NewRelic::Agent.manual_start"
end
- # this method makes sure that the agent is running. it's important
- # for passenger where processes are forked and the agent is
- # dormant
+ # This method attempts to detect when we're in a forked process and tries
+ # to re-establish a new agent run. It's important
+ # for passenger/unicorn/etc where processes are forked and the worker
+ # loop thread is no longer alive.
#
def ensure_worker_thread_started
- return unless control.agent_enabled? && control.monitor_mode? && !@invalid_license
- if !running?
- # We got some reports of threading errors in Unicorn with this.
- log.debug "Detected that the worker loop is not running. Restarting." rescue nil
- # Assume we've been forked, clear out stats that are left over from parent process
- reset_stats
- launch_worker_thread
- @stats_engine.spawn_sampler_thread
- end
+ return if !control.agent_enabled? || @invalid_license
+
+ # @connected gets false after we fail to connect or have an error
+ # connecting. @connected has nil if we haven't finished trying to connect.
+ # or we didn't attempt a connection because this is the master process
+ return unless @worker_thread && !@worker_thread.alive? && @connected != false
+
+ # This ensures that we don't enter this block again
+ @worker_thread = nil
+
+ # We got some reports of threading errors in Unicorn with this.
+ log.debug "Detected that the worker thread is not running in #$$. Restarting." rescue nil
+ # Assume we've been forked if there's a worker_loop already created.
+ # Clear out stats that are left over from parent process when we know the parent process
+ # did not try to establish a connection
+ reset_stats if @connected.nil?
+ start_new_run
+ @stats_engine.spawn_sampler_thread
end
- # True if the worker thread has been started. Doesn't necessarily
- # mean we are connected
- def running?
- control.agent_enabled? && control.monitor_mode? && @task_loop && @task_loop.pid == $$
- end
-
# True if we have initialized and completed 'start'
def started?
@started
end
@@ -160,11 +162,12 @@
log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty?
@started = true
sampler_config = control.fetch('transaction_tracer', {})
- @use_transaction_sampler = sampler_config.fetch('enabled', true)
+ @should_send_samples = sampler_config.fetch('enabled', true)
+ log.info "Transaction tracing not enabled." if not @should_send_samples
@record_sql = sampler_config.fetch('record_sql', :obfuscated).to_sym
# use transaction_threshold: 4.0 to force the TT collection
# threshold to 4 seconds
@@ -176,15 +179,10 @@
if @slowest_transaction_threshold =~ /apdex_f/i
@slowest_transaction_threshold = apdex_f
end
@slowest_transaction_threshold = @slowest_transaction_threshold.to_f
- if @use_transaction_sampler
- log.info "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds."
- else
- log.info "Transaction tracing not enabled."
- end
@explain_threshold = sampler_config.fetch('explain_threshold', 0.5).to_f
@explain_enabled = sampler_config.fetch('explain_enabled', true)
@random_sample = sampler_config.fetch('random_sample', false)
log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw
# Initialize transaction sampler
@@ -196,93 +194,81 @@
control.log! "No license key found. Please edit your newrelic.yml file and insert your license key.", :error
elsif control.license_key.length != 40
@invalid_license = true
control.log! "Invalid license key: #{control.license_key}", :error
else
- launch_worker_thread
+ start_new_run
# When the VM shuts down, attempt to send a message to the
# server that this agent run is stopping, assuming it has
# successfully connected
# This shutdown handler doesn't work if Sinatra is running
# because it executes in the shutdown handler!
at_exit { shutdown } unless [:sinatra, :unicorn].include? NewRelic::Control.instance.dispatcher
end
end
- control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #{$$}"
+ control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #$$"
control.log! "Agent Log found in #{NewRelic::Control.instance.log_file}" if NewRelic::Control.instance.log_file
end
private
def collector
@collector ||= control.server
end
- # Connect to the server, and run the worker loop forever.
- # Will not return.
- def run_task_loop
- # determine the reporting period (server based)
- # note if the agent attempts to report more frequently than
- # the specified report data, then it will be ignored.
-
- control.log! "Reporting performance data every #{@report_period} seconds."
- @task_loop.add_task(@report_period, "Timeslice Data Send") do
- harvest_and_send_timeslice_data
- end
-
- if @should_send_samples && @use_transaction_sampler
- @task_loop.add_task(@report_period, "Transaction Sampler Send") do
- harvest_and_send_slowest_sample
- end
- elsif !control.developer_mode?
- # We still need the sampler for dev mode.
- @transaction_sampler.disable
- end
-
- if @should_send_errors && @error_collector.enabled
- @task_loop.add_task(@report_period, "Error Send") do
- harvest_and_send_errors
- end
- end
- log.debug "Running worker loop"
- @task_loop.run
- rescue StandardError
- log.debug "Error in worker loop: #{$!}"
- @connected = false
- raise
- end
-
- def launch_worker_thread
- if (control.dispatcher == :passenger && $0 =~ /ApplicationSpawner/)
- log.debug "Process is passenger spawner - don't connect to RPM service"
- return
- end
-
- @task_loop = WorkerLoop.new(log)
-
+ # Try to launch the worker thread and connect to the server
+ def start_new_run
+ @task_loop = WorkerLoop.new
log.debug "Creating RPM worker thread."
@worker_thread = Thread.new do
begin
NewRelic::Agent.disable_all_tracing do
- connect
- run_task_loop if @connected
+ # We try to connect. If this returns false that means
+ # the server rejected us for a licensing reason and we should
+ # just exit the thread. If it returns nil
+ # that means it didn't try to connect because we're in the master
+ @connected = connect
+ if @connected
+ # disable transaction sampling if disabled by the server and we're not in dev mode
+ if !control.developer_mode? && !@should_send_samples
+ @transaction_sampler.disable
+ end
+ control.log! "Reporting performance data every #{@report_period} seconds."
+ log.debug "Running worker loop"
+ # note if the agent attempts to report more frequently than allowed by the server
+ # the server will start dropping data.
+ @task_loop.run(@report_period) do
+ harvest_and_send_timeslice_data
+ harvest_and_send_slowest_sample if @should_send_samples
+ harvest_and_send_errors if error_collector.enabled
+ end
+ @connected = false
+ end
end
rescue NewRelic::Agent::ForceRestartException => e
log.info e.message
# disconnect and start over.
# clear the stats engine
reset_stats
- @connected = false
+ @connected = nil
# Wait a short time before trying to reconnect
sleep 30
retry
- rescue IgnoreSilentlyException
+ rescue ForceDisconnectException => e
+ # when a disconnect is requested, stop the current thread, which
+ # is the worker thread that gathers data and talks to the
+ # server.
+ log.error "RPM forced this agent to disconnect (#{e.message})"
+ @connected = false
+ rescue ServerConnectionException => e
control.log! "Unable to establish connection with the server. Run with log level set to debug for more information."
- rescue Exception => e
+ log.debug("#{e.class.name}: #{e.message}\n#{e.backtrace.first}")
@connected = false
+ rescue Exception => e
log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}"
- end
- end
+ @connected = false
+ end # begin
+ end # thread new
@worker_thread['newrelic_label'] = 'Worker Loop'
end
def control
NewRelic::Control.instance
@@ -311,17 +297,22 @@
# @connected has true when finished. If not successful, you can
# keep calling this. Return false if we could not establish a
# connection with the server and we should not retry, such as if
# there's a bad license key.
def connect
+ if $0 =~ /ApplicationSpawner|master/
+ log.debug "Process is master spawner (#$0) -- don't connect to RPM service"
+ return nil
+ end
# wait a few seconds for the web server to boot, necessary in development
connect_retry_period = 5
connect_attempts = 0
@agent_id = nil
begin
sleep connect_retry_period.to_i
environment = control['send_environment_info'] != false ? control.local_env.snapshot : []
+ log.debug "Connecting with validation seed/token: #{control.validate_seed}/#{control.validate_token}" if control.validate_seed
@agent_id ||= invoke_remote :start, @local_host, {
:pid => $$,
:launch_time => @launch_time.to_f,
:agent_version => NewRelic::VERSION::STRING,
:environment => environment,
@@ -338,23 +329,24 @@
control.log! "Connected to NewRelic Service at #{@collector}"
log.debug "Agent ID = #{@agent_id}."
# Ask the server for permission to send transaction samples.
# determined by subscription license.
- @should_send_samples = invoke_remote :should_collect_samples, @agent_id
+ @should_send_samples &&= invoke_remote :should_collect_samples, @agent_id
if @should_send_samples
sampling_rate = invoke_remote :sampling_rate, @agent_id if @random_sample
@transaction_sampler.sampling_rate = sampling_rate
log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}" if sampling_rate
+ log.info "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds."
end
# Ask for permission to collect error data
- @should_send_errors = invoke_remote :should_collect_errors, @agent_id
+ error_collector.enabled &&= invoke_remote(:should_collect_errors, @agent_id)
- log.info "Transaction traces will be sent to the RPM service" if @use_transaction_sampler && @should_send_samples
- log.info "Errors will be sent to the RPM service" if @error_collector.enabled && @should_send_errors
+ log.info "Transaction traces will be sent to the RPM service." if @should_send_samples
+ log.info "Errors will be sent to the RPM service." if error_collector.enabled
@connected = true
rescue LicenseException => e
control.log! e.message, :error
@@ -362,11 +354,11 @@
@invalid_license = true
return false
rescue Timeout::Error, StandardError => e
log.info "Unable to establish connection with New Relic RPM Service at #{control.server}"
- unless e.instance_of? IgnoreSilentlyException
+ unless e.instance_of? ServerConnectionException
log.error e.message
log.debug e.backtrace.join("\n")
end
# retry logic
connect_attempts += 1
@@ -374,11 +366,11 @@
when 1..2
connect_retry_period, period_msg = 60, "1 minute"
when 3..5 then
connect_retry_period, period_msg = 60 * 2, "2 minutes"
else
- connect_retry_period, period_msg = 10*60, "10 minutes"
+ connect_retry_period, period_msg = 10 * 60, "10 minutes"
end
log.info "Will re-attempt in #{period_msg}"
retry
end
end
@@ -425,10 +417,11 @@
metric_ids.each do | spec, id |
@metric_ids[spec] = id
end if metric_ids
log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
+ puts "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
# if we successfully invoked this web service, then clear the unsent message cache.
@unsent_timeslice_data = {}
@last_harvest_time = now
@@ -538,18 +531,16 @@
rescue Timeout::Error
log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30
raise
end
if response.is_a? Net::HTTPServiceUnavailable
- log.debug(response.body || response.message)
- raise IgnoreSilentlyException
+ raise ServerConnectionException, "Service unavailable: #{response.body || response.message}"
elsif response.is_a? Net::HTTPGatewayTimeOut
log.debug("Timed out getting response: #{response.message}")
raise Timeout::Error, response.message
elsif !(response.is_a? Net::HTTPSuccess)
- log.debug "Unexpected response from server: #{response.code}: #{response.message}"
- raise IgnoreSilentlyException
+ raise ServerConnectionException, "Unexpected response from server: #{response.code}: #{response.message}"
end
response
end
def decompress_response(response)
@@ -585,34 +576,23 @@
# raises the right exception if the remote server tells it to die
return check_for_exception(response)
rescue ForceRestartException => e
log.info e.message
raise
- rescue ForceDisconnectException => e
- log.error "RPM forced this agent to disconnect (#{e.message})\n" \
- "Restart this process to resume monitoring via rpm.newrelic.com."
- # when a disconnect is requested, stop the current thread, which
- # is the worker thread that gathers data and talks to the
- # server.
- @connected = false
- Thread.exit
rescue SystemCallError, SocketError => e
# These include Errno connection errors
- log.debug "Recoverable error connecting to the server: #{e}"
- raise IgnoreSilentlyException
+ raise ServerConnectionException, "Recoverable error connecting to the server: #{e}"
end
def graceful_disconnect
if @connected
begin
log.debug "Sending graceful shutdown message to #{control.server}"
@request_timeout = 5
-
+ log.debug "Flushing unsent metric data to server"
+ @task_loop.run_task
log.debug "Sending RPM service agent run shutdown message"
- harvest_and_send_timeslice_data
-# harvest_and_send_slowest_sample
- harvest_and_send_errors
invoke_remote :shutdown, @agent_id, Time.now.to_f
log.debug "Graceful shutdown complete"
rescue Timeout::Error, StandardError