lib/new_relic/agent/agent.rb in newrelic_rpm-2.12.3 vs lib/new_relic/agent/agent.rb in newrelic_rpm-2.13.0.beta3
- old
+ new
@@ -1,668 +1,736 @@
require 'socket'
-require 'net/https'
+require 'net/https'
require 'net/http'
require 'logger'
require 'zlib'
require 'stringio'
-# The NewRelic Agent collects performance data from ruby applications
-# in realtime as the application runs, and periodically sends that
-# data to the NewRelic server.
module NewRelic
module Agent
-
- # The Agent is a singleton that is instantiated when the plugin is
- # activated.
- class Agent
-
- # Specifies the version of the agent's communication protocol with
- # the NewRelic hosted site.
-
- PROTOCOL_VERSION = 8
- # 14105: v8 (tag 2.10.3)
- # (no v7)
- # 10379: v6 (not tagged)
- # 4078: v5 (tag 2.5.4)
- # 2292: v4 (tag 2.3.6)
- # 1754: v3 (tag 2.3.0)
- # 534: v2 (shows up in 2.1.0, our first tag)
-
-
- attr_reader :obfuscator
- attr_reader :stats_engine
- attr_reader :transaction_sampler
- attr_reader :error_collector
- attr_reader :record_sql
- attr_reader :histogram
- attr_reader :metric_ids
-
- # Should only be called by NewRelic::Control
- def self.instance
- @instance ||= self.new
- end
- # This method is deprecated. Use NewRelic::Agent.manual_start
- def manual_start(ignored=nil, also_ignored=nil)
- raise "This method no longer supported. Instead use the class method NewRelic::Agent.manual_start"
- end
-
- # This method should be called in a forked process after a fork.
- # It assumes the parent process initialized the agent, but does
- # not assume the agent started.
- #
- # * It clears any metrics carried over from the parent process
- # * Restarts the sampler thread if necessary
- # * Initiates a new agent run and worker loop unless that was done
- # in the parent process and +:force_reconnect+ is not true
- #
- # Options:
- # * <tt>:force_reconnect => true</tt> to force the spawned process to
- # establish a new connection, such as when forking a long running process.
- # The default is false--it will only connect to the server if the parent
- # had not connected.
- # * <tt>:keep_retrying => false</tt> if we try to initiate a new
- # connection, this tells me to only try it once so this method returns
- # quickly if there is some kind of latency with the server.
- def after_fork(options={})
-
- # @connected gets false after we fail to connect or have an error
- # connecting. @connected has nil if we haven't finished trying to connect.
- # or we didn't attempt a connection because this is the master process
- log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]"
- return if !control.agent_enabled? or
- !control.monitor_mode? or
- @connected == false or
- @worker_thread && @worker_thread.alive?
+ # The Agent is a singleton that is instantiated when the plugin is
+ # activated. It collects performance data from ruby applications
+ # in realtime as the application runs, and periodically sends that
+ # data to the NewRelic server.
+ class Agent
- log.info "Starting the worker thread in #$$ after forking."
+ # Specifies the version of the agent's communication protocol with
+ # the NewRelic hosted site.
- # Clear out stats that are left over from parent process
- reset_stats
-
- start_worker_thread(options)
- @stats_engine.start_sampler_thread
- end
-
- # True if we have initialized and completed 'start'
- def started?
- @started
- end
-
- # Attempt a graceful shutdown of the agent.
- def shutdown
- return if not started?
- if @worker_loop
- @worker_loop.stop
+ PROTOCOL_VERSION = 8
+ # 14105: v8 (tag 2.10.3)
+ # (no v7)
+ # 10379: v6 (not tagged)
+ # 4078: v5 (tag 2.5.4)
+ # 2292: v4 (tag 2.3.6)
+ # 1754: v3 (tag 2.3.0)
+ # 534: v2 (shows up in 2.1.0, our first tag)
+
+
+ def initialize
- log.debug "Starting Agent shutdown"
-
- # if litespeed, then ignore all future SIGUSR1 - it's
- # litespeed trying to shut us down
-
- if control.dispatcher == :litespeed
- Signal.trap("SIGUSR1", "IGNORE")
- Signal.trap("SIGTERM", "IGNORE")
- end
-
- begin
- graceful_disconnect
- rescue => e
- log.error e
- log.error e.backtrace.join("\n")
- end
- end
- @started = nil
- end
-
- def start_transaction
- @stats_engine.start_transaction
- end
-
- def end_transaction
- @stats_engine.end_transaction
- end
-
- def set_record_sql(should_record)
- prev = Thread::current[:record_sql]
- Thread::current[:record_sql] = should_record
- prev.nil? || prev
- end
-
- def set_record_tt(should_record)
- prev = Thread::current[:record_tt]
- Thread::current[:record_tt] = should_record
- prev.nil? || prev
- end
- # Push flag indicating whether we should be tracing in this
- # thread.
- def push_trace_execution_flag(should_trace=false)
- (Thread.current[:newrelic_untraced] ||= []) << should_trace
- end
+ @launch_time = Time.now
- # Pop the current trace execution status. Restore trace execution status
- # to what it was before we pushed the current flag.
- def pop_trace_execution_flag
- Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced]
- end
-
- def set_sql_obfuscator(type, &block)
- if type == :before
- @obfuscator = NewRelic::ChainedCall.new(block, @obfuscator)
- elsif type == :after
- @obfuscator = NewRelic::ChainedCall.new(@obfuscator, block)
- elsif type == :replace
- @obfuscator = block
- else
- fail "unknown sql_obfuscator type #{type}"
+ @metric_ids = {}
+ @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
+ @stats_engine = NewRelic::Agent::StatsEngine.new
+ @transaction_sampler = NewRelic::Agent::TransactionSampler.new
+ @stats_engine.transaction_sampler = @transaction_sampler
+ @error_collector = NewRelic::Agent::ErrorCollector.new
+
+ @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60)
+
+ @last_harvest_time = Time.now
+ @obfuscator = method(:default_sql_obfuscator)
end
- end
- def log
- NewRelic::Agent.logger
- end
-
- # Start up the agent. This verifies that the agent_enabled? is
- # true and initializes the sampler based on the current
- # configuration settings. Then it will fire up the background
- # thread for sending data to the server if applicable.
- def start
- if started?
- control.log! "Agent Started Already!", :error
- return
+ module ClassMethods
+ # Should only be called by NewRelic::Control
+ def instance
+ @instance ||= self.new
+ end
end
- return if !control.agent_enabled?
- @started = true
- @local_host = determine_host
-
- if control.dispatcher.nil? || control.dispatcher.to_s.empty?
- log.info "No dispatcher detected."
- else
- log.info "Dispatcher: #{control.dispatcher.to_s}"
- end
- log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty?
-
- sampler_config = control.fetch('transaction_tracer', {})
- @should_send_samples = sampler_config.fetch('enabled', true)
- log.info "Transaction tracing not enabled." if not @should_send_samples
-
- @record_sql = sampler_config.fetch('record_sql', :obfuscated).to_sym
-
- # use transaction_threshold: 4.0 to force the TT collection
- # threshold to 4 seconds
- # use transaction_threshold: apdex_f to use your apdex t value
- # multiplied by 4
- # undefined transaction_threshold defaults to 2.0
- apdex_f = 4 * NewRelic::Control.instance.apdex_t
- @slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0)
- if @slowest_transaction_threshold =~ /apdex_f/i
- @slowest_transaction_threshold = apdex_f
- end
- @slowest_transaction_threshold = @slowest_transaction_threshold.to_f
-
- @explain_threshold = sampler_config.fetch('explain_threshold', 0.5).to_f
- @explain_enabled = sampler_config.fetch('explain_enabled', true)
- @random_sample = sampler_config.fetch('random_sample', false)
- log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw
- # Initialize transaction sampler
- @transaction_sampler.random_sampling = @random_sample
- case
- when !control.monitor_mode?
- log.warn "Agent configured not to send data in this environment - edit newrelic.yml to change this"
- when !control.license_key
- log.error "No license key found. Please edit your newrelic.yml file and insert your license key."
- when control.license_key.length != 40
- log.error "Invalid license key: #{control.license_key}"
- when [:passenger, :unicorn].include?(control.dispatcher)
- log.info "Connecting workers after forking."
- else
- # Do the connect in the foreground if we are in sync mode
- NewRelic::Agent.disable_all_tracing { connect(:keep_retrying => false) } if control.sync_startup
+ module InstanceMethods
+
+ attr_reader :obfuscator
+ attr_reader :stats_engine
+ attr_reader :transaction_sampler
+ attr_reader :error_collector
+ attr_reader :record_sql
+ attr_reader :histogram
+ attr_reader :metric_ids
+ attr_reader :url_rules
+
+ def record_transaction(duration_seconds, options={})
+ is_error = options['is_error'] || options['error_message'] || options['exception']
+ metric = options['metric']
+ metric ||= options['uri'] # normalize this with url rules
+ raise "metric or uri arguments required" unless metric
+ metric_info = NewRelic::MetricParser.for_metric_named(metric)
+
+ if metric_info.is_web_transaction?
+ NewRelic::Agent::Instrumentation::MetricFrame.record_apdex(metric_info, duration_seconds, duration_seconds, is_error)
+ histogram.process(duration_seconds)
+ end
+ metrics = metric_info.summary_metrics
+
+ metrics << metric
+ metrics.each do |name|
+ stats = stats_engine.get_stats_no_scope(name)
+ stats.record_data_point(duration_seconds)
+ end
+
+ if is_error
+ if error_message
+ e = Exception.new error_message if error_message
+ error_collector.notice_error e, :uri => uri, :metric => uri
+ end
+ end
+ # busy time ?
+ end
+
+ # This method is deprecated. Use NewRelic::Agent.manual_start
+ def manual_start(ignored=nil, also_ignored=nil)
+ raise "This method no longer supported. Instead use the class method NewRelic::Agent.manual_start"
+ end
+
+ # This method should be called in a forked process after a fork.
+ # It assumes the parent process initialized the agent, but does
+ # not assume the agent started.
+ #
+ # The call is idempotent, but not re-entrant.
+ #
+ # * It clears any metrics carried over from the parent process
+ # * Restarts the sampler thread if necessary
+ # * Initiates a new agent run and worker loop unless that was done
+ # in the parent process and +:force_reconnect+ is not true
+ #
+ # Options:
+ # * <tt>:force_reconnect => true</tt> to force the spawned process to
+ # establish a new connection, such as when forking a long running process.
+ # The default is false--it will only connect to the server if the parent
+ # had not connected.
+ # * <tt>:keep_retrying => false</tt> if we try to initiate a new
+ # connection, this tells me to only try it once so this method returns
+ # quickly if there is some kind of latency with the server.
+ def after_fork(options={})
+
+ # @connected gets false after we fail to connect or have an error
+ # connecting. @connected has nil if we haven't finished trying to connect.
+ # or we didn't attempt a connection because this is the master process
+
+ # log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]"
+ return if !control.agent_enabled? or
+ !control.monitor_mode? or
+ @connected == false or
+ @worker_thread && @worker_thread.alive?
+
+ log.info "Starting the worker thread in #$$ after forking."
+
+ # Clear out stats that are left over from parent process
+ reset_stats
+
+ # Don't ever check to see if this is a spawner. If we're in a forked process
+ # I'm pretty sure we're not also forking new instances.
+ start_worker_thread(options)
+ @stats_engine.start_sampler_thread
+ end
+
+ # True if we have initialized and completed 'start'
+ def started?
+ @started
+ end
- # Start the event loop and initiate connection if necessary
- start_worker_thread
-
- # Our shutdown handler needs to run after other shutdown handlers
- # that may be doing things like running the app (hello sinatra).
- if RUBY_VERSION =~ /rubinius/i
- list = at_exit { shutdown }
- # move the shutdown handler to the front of the list, to
- # execute last:
- list.unshift(list.pop)
- elsif !defined?(JRuby) or !defined?(Sinatra::Application)
- at_exit { at_exit { shutdown } }
+ # Return nil if not yet connected, true if successfully started
+ # and false if we failed to start.
+ def connected?
+ @connected
end
- end
- control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #$$"
- control.log! "Agent Log found in #{NewRelic::Control.instance.log_file}" if NewRelic::Control.instance.log_file
- end
-
- # Clear out the metric data, errors, and transaction traces. Reset the histogram data.
- def reset_stats
- @stats_engine.reset_stats
- @unsent_errors = []
- @traces = nil
- @unsent_timeslice_data = {}
- @last_harvest_time = Time.now
- @launch_time = Time.now
- @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
- end
- private
- def collector
- @collector ||= control.server
- end
-
- # Try to launch the worker thread and connect to the server.
- #
- # See #connect for a description of connection_options.
- def start_worker_thread(connection_options = {})
- log.debug "Creating RPM worker thread."
- @worker_thread = Thread.new do
- begin
- NewRelic::Agent.disable_all_tracing do
- # We try to connect. If this returns false that means
- # the server rejected us for a licensing reason and we should
- # just exit the thread. If it returns nil
- # that means it didn't try to connect because we're in the master.
- connect(connection_options)
- if @connected
- # disable transaction sampling if disabled by the server and we're not in dev mode
- if !control.developer_mode? && !@should_send_samples
- @transaction_sampler.disable
+ # Attempt a graceful shutdown of the agent.
+ def shutdown
+ return if not started?
+ if @worker_loop
+ @worker_loop.stop
+
+ log.debug "Starting Agent shutdown"
+
+ # if litespeed, then ignore all future SIGUSR1 - it's
+ # litespeed trying to shut us down
+
+ if control.dispatcher == :litespeed
+ Signal.trap("SIGUSR1", "IGNORE")
+ Signal.trap("SIGTERM", "IGNORE")
+ end
+
+ begin
+ NewRelic::Agent.disable_all_tracing do
+ graceful_disconnect
end
- log.info "Reporting performance data every #{@report_period} seconds."
- log.debug "Running worker loop"
- # note if the agent attempts to report more frequently than allowed by the server
- # the server will start dropping data.
- @worker_loop = WorkerLoop.new
- @worker_loop.run(@report_period) do
- harvest_and_send_timeslice_data
- harvest_and_send_slowest_sample if @should_send_samples
- harvest_and_send_errors if error_collector.enabled
+ rescue => e
+ log.error e
+ log.error e.backtrace.join("\n")
+ end
+ end
+ @started = nil
+ end
+
+ def start_transaction
+ @stats_engine.start_transaction
+ end
+
+ def end_transaction
+ @stats_engine.end_transaction
+ end
+
+ def set_record_sql(should_record)
+ prev = Thread::current[:record_sql]
+ Thread::current[:record_sql] = should_record
+ prev.nil? || prev
+ end
+
+ def set_record_tt(should_record)
+ prev = Thread::current[:record_tt]
+ Thread::current[:record_tt] = should_record
+ prev.nil? || prev
+ end
+ # Push flag indicating whether we should be tracing in this
+ # thread.
+ def push_trace_execution_flag(should_trace=false)
+ (Thread.current[:newrelic_untraced] ||= []) << should_trace
+ end
+
+ # Pop the current trace execution status. Restore trace execution status
+ # to what it was before we pushed the current flag.
+ def pop_trace_execution_flag
+ Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced]
+ end
+
+ def set_sql_obfuscator(type, &block)
+ if type == :before
+ @obfuscator = NewRelic::ChainedCall.new(block, @obfuscator)
+ elsif type == :after
+ @obfuscator = NewRelic::ChainedCall.new(@obfuscator, block)
+ elsif type == :replace
+ @obfuscator = block
+ else
+ fail "unknown sql_obfuscator type #{type}"
+ end
+ end
+
+ def log
+ NewRelic::Agent.logger
+ end
+
+ # Start up the agent. This verifies that the agent_enabled? is
+ # true and initializes the sampler based on the current
+ # configuration settings. Then it will fire up the background
+ # thread for sending data to the server if applicable.
+ def start
+ if started?
+ control.log! "Agent Started Already!", :error
+ return
+ end
+ return if !control.agent_enabled?
+ @started = true
+ @local_host = determine_host
+
+ if control.dispatcher.nil? || control.dispatcher.to_s.empty?
+ log.info "No dispatcher detected."
+ else
+ log.info "Dispatcher: #{control.dispatcher.to_s}"
+ end
+ log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty?
+
+ sampler_config = control.fetch('transaction_tracer', {})
+ # TODO: Should move this state into the transaction sampler instance
+ @should_send_samples = @config_should_send_samples = sampler_config.fetch('enabled', true)
+ @should_send_random_samples = sampler_config.fetch('random_sample', false)
+ @explain_threshold = sampler_config.fetch('explain_threshold', 0.5).to_f
+ @explain_enabled = sampler_config.fetch('explain_enabled', true)
+ @record_sql = sampler_config.fetch('record_sql', :obfuscated).to_sym
+
+ # use transaction_threshold: 4.0 to force the TT collection
+ # threshold to 4 seconds
+ # use transaction_threshold: apdex_f to use your apdex t value
+ # multiplied by 4
+ # undefined transaction_threshold defaults to 2.0
+ apdex_f = 4 * NewRelic::Control.instance.apdex_t
+ @slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0)
+ if @slowest_transaction_threshold =~ /apdex_f/i
+ @slowest_transaction_threshold = apdex_f
+ end
+ @slowest_transaction_threshold = @slowest_transaction_threshold.to_f
+
+ log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw
+
+ case
+ when !control.monitor_mode?
+ log.warn "Agent configured not to send data in this environment - edit newrelic.yml to change this"
+ when !control.license_key
+ log.error "No license key found. Please edit your newrelic.yml file and insert your license key."
+ when control.license_key.length != 40
+ log.error "Invalid license key: #{control.license_key}"
+ when [:passenger, :unicorn].include?(control.dispatcher)
+ log.info "Connecting workers after forking."
+ else
+ # Do the connect in the foreground if we are in sync mode
+ NewRelic::Agent.disable_all_tracing { connect(:keep_retrying => false) } if control.sync_startup
+
+ # Start the event loop and initiate connection if necessary
+ start_worker_thread
+
+ # Our shutdown handler needs to run after other shutdown handlers
+ # that may be doing things like running the app (hello sinatra).
+ if control.send_data_on_exit
+ if RUBY_VERSION =~ /rubinius/i
+ list = at_exit { shutdown }
+ # move the shutdown handler to the front of the list, to
+ # execute last:
+ list.unshift(list.pop)
+ elsif !defined?(JRuby) or !defined?(Sinatra::Application)
+ at_exit { at_exit { shutdown } }
end
+ end
+ end
+ log.info "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #$$"
+ log.info "Agent Log found in #{NewRelic::Control.instance.log_file}" if NewRelic::Control.instance.log_file
+ end
+
+ # Clear out the metric data, errors, and transaction traces. Reset the histogram data.
+ def reset_stats
+ @stats_engine.reset_stats
+ @unsent_errors = []
+ @traces = nil
+ @unsent_timeslice_data = {}
+ @last_harvest_time = Time.now
+ @launch_time = Time.now
+ @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
+ end
+
+ private
+ def collector
+ @collector ||= control.server
+ end
+
+ # Try to launch the worker thread and connect to the server.
+ #
+ # See #connect for a description of connection_options.
+ def start_worker_thread(connection_options = {})
+ log.debug "Creating RPM worker thread."
+ @worker_thread = Thread.new do
+ begin
+ NewRelic::Agent.disable_all_tracing do
+ # We try to connect. If this returns false that means
+ # the server rejected us for a licensing reason and we should
+ # just exit the thread. If it returns nil
+ # that means it didn't try to connect because we're in the master.
+ connect(connection_options)
+ if @connected
+ # disable transaction sampling if disabled by the server and we're not in dev mode
+ if !control.developer_mode? && !@should_send_samples
+ @transaction_sampler.disable
+ else
+ @transaction_sampler.enable # otherwise ensure TT's are enabled
+ end
+ log.info "Reporting performance data every #{@report_period} seconds."
+ log.debug "Running worker loop"
+ # Note if the agent attempts to report more frequently than allowed by the server
+ # the server will start dropping data.
+ @worker_loop = WorkerLoop.new
+ @worker_loop.run(@report_period) do
+ harvest_and_send_timeslice_data
+ harvest_and_send_slowest_sample if @should_send_samples
+ harvest_and_send_errors if error_collector.enabled
+ end
+ else
+ log.debug "No connection. Worker thread finished."
+ end
+ end
+ rescue NewRelic::Agent::ForceRestartException => e
+ log.info e.message
+ # disconnect and start over.
+ # clear the stats engine
+ reset_stats
+ @metric_ids = {}
+ @connected = nil
+ # Wait a short time before trying to reconnect
+ sleep 30
+ retry
+ rescue NewRelic::Agent::ForceDisconnectException => e
+ # when a disconnect is requested, stop the current thread, which
+ # is the worker thread that gathers data and talks to the
+ # server.
+ log.error "RPM forced this agent to disconnect (#{e.message})"
+ @connected = false
+ rescue NewRelic::Agent::ServerConnectionException => e
+ log.error "Unable to establish connection with the server. Run with log level set to debug for more information."
+ log.debug("#{e.class.name}: #{e.message}\n#{e.backtrace.first}")
+ @connected = false
+ rescue Exception => e
+ log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}"
+ @connected = false
+ end # begin
+ end # thread new
+ @worker_thread['newrelic_label'] = 'Worker Loop'
+ end
+
+ def control
+ NewRelic::Control.instance
+ end
+
+ # Connect to the server and validate the license. If successful,
+ # @connected has true when finished. If not successful, you can
+ # keep calling this. Return false if we could not establish a
+ # connection with the server and we should not retry, such as if
+ # there's a bad license key.
+ #
+ # Set keep_retrying=false to disable retrying and return asap, such as when
+ # invoked in the foreground. Otherwise this runs until a successful
+ # connection is made, or the server rejects us.
+ #
+ # * <tt>:keep_retrying => false</tt> to only try to connect once, and
+ # return with the connection set to nil. This ensures we may try again
+ # later (default true).
+ # * <tt>force_reconnect => true</tt> if you want to establish a new connection
+ # to the server before running the worker loop. This means you get a separate
+ # agent run and RPM sees it as a separate instance (default is false).
+ def connect(options)
+ # Don't proceed if we already connected (@connected=true) or if we tried
+ # to connect and were rejected with prejudice because of a license issue
+ # (@connected=false).
+ return if !@connected.nil? && !options[:force_reconnect]
+ keep_retrying = options[:keep_retrying].nil? || options[:keep_retrying]
+
+ # wait a few seconds for the web server to boot, necessary in development
+ connect_retry_period = keep_retrying ? 10 : 0
+ connect_attempts = 0
+ @agent_id = nil
+ begin
+ sleep connect_retry_period.to_i
+ log.debug "Connecting Process to RPM: #$0"
+ host = invoke_remote(:get_redirect_host)
+ @collector = control.server_from_host(host) if host
+ environment = control['send_environment_info'] != false ? control.local_env.snapshot : []
+ log.debug "Connecting with validation seed/token: #{control.validate_seed}/#{control.validate_token}" if control.validate_seed
+ connect_data = invoke_remote :connect,
+ :pid => $$,
+ :host => @local_host,
+ :app_name => control.app_names,
+ :language => 'ruby',
+ :agent_version => NewRelic::VERSION::STRING,
+ :environment => environment,
+ :settings => control.settings,
+ :validate => {:seed => control.validate_seed,
+ :token => control.validate_token }
+
+ @agent_id = connect_data['agent_run_id']
+ @report_period = connect_data['data_report_period']
+ @url_rules = connect_data['url_rules']
+
+ control.log! "Connected to NewRelic Service at #{@collector}"
+ log.debug "Agent Run = #{@agent_id}."
+ log.debug "Connection data = #{connect_data.inspect}"
+
+ # Ask the server for permission to send transaction samples.
+ # determined by subscription license.
+ @should_send_samples = @config_should_send_samples && connect_data['collect_traces']
+
+ if @should_send_samples
+ if @should_send_random_samples
+ @transaction_sampler.random_sampling = true
+ @transaction_sampler.sampling_rate = connect_data['sampling_rate']
+ log.info "Transaction sampling enabled, rate = #{@transaction_sampler.sampling_rate}"
+ end
+ log.debug "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds."
else
- log.debug "No connection. Worker thread finished."
+ log.debug "Transaction traces will not be sent to the RPM service."
end
+
+ # Ask for permission to collect error data
+ error_collector.enabled = error_collector.config_enabled && connect_data['collect_errors']
+
+ log.debug "Errors will be sent to the RPM service." if error_collector.enabled
+
+ @connected_pid = $$
+ @connected = true
+
+ rescue NewRelic::Agent::LicenseException => e
+ log.error e.message
+ log.info "Visit NewRelic.com to obtain a valid license key, or to upgrade your account."
+ @connected = false
+
+ rescue Timeout::Error, StandardError => e
+ if e.instance_of? NewRelic::Agent::ServerConnectionException
+ log.info "Unable to establish connection with New Relic RPM Service at #{control.server}: #{e.message}"
+ log.debug e.backtrace.join("\n")
+ else
+ log.error "Error establishing connection with New Relic RPM Service at #{control.server}: #{e.message}"
+ log.debug e.backtrace.join("\n")
+ end
+ # retry logic
+ if keep_retrying
+ connect_attempts += 1
+ case connect_attempts
+ when 1..2
+ connect_retry_period, period_msg = 60, "1 minute"
+ when 3..5
+ connect_retry_period, period_msg = 60 * 2, "2 minutes"
+ else
+ connect_retry_period, period_msg = 5 * 60, "5 minutes"
+ end
+ log.info "Will re-attempt in #{period_msg}"
+ retry
+ else
+ @connected = nil
+ end
end
- rescue NewRelic::Agent::ForceRestartException => e
- log.info e.message
- # disconnect and start over.
- # clear the stats engine
- reset_stats
- @metric_ids = {}
- @connected = nil
- # Wait a short time before trying to reconnect
- sleep 30
- retry
- rescue NewRelic::Agent::ForceDisconnectException => e
- # when a disconnect is requested, stop the current thread, which
- # is the worker thread that gathers data and talks to the
- # server.
- log.error "RPM forced this agent to disconnect (#{e.message})"
- @connected = false
- rescue NewRelic::Agent::ServerConnectionException => e
- control.log! "Unable to establish connection with the server. Run with log level set to debug for more information."
- log.debug("#{e.class.name}: #{e.message}\n#{e.backtrace.first}")
- @connected = false
- rescue Exception => e
- log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}"
- @connected = false
- end # begin
- end # thread new
- @worker_thread['newrelic_label'] = 'Worker Loop'
- end
-
- def control
- NewRelic::Control.instance
- end
-
- def initialize
- @launch_time = Time.now
-
- @metric_ids = {}
- @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10)
- @stats_engine = NewRelic::Agent::StatsEngine.new
- @transaction_sampler = NewRelic::Agent::TransactionSampler.new
- @stats_engine.transaction_sampler = @transaction_sampler
- @error_collector = NewRelic::Agent::ErrorCollector.new
-
- @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60)
-
- @last_harvest_time = Time.now
- @obfuscator = method(:default_sql_obfuscator)
- end
-
- # Connect to the server and validate the license. If successful,
- # @connected has true when finished. If not successful, you can
- # keep calling this. Return false if we could not establish a
- # connection with the server and we should not retry, such as if
- # there's a bad license key.
- #
- # Set keep_retrying=false to disable retrying and return asap, such as when
- # invoked in the foreground. Otherwise this runs until a successful
- # connection is made, or the server rejects us.
- #
- # * <tt>:keep_retrying => false</tt> to only try to connect once, and
- # return with the connection set to nil. This ensures we may try again
- # later (default true).
- # * <tt>force_reconnect => true</tt> if you want to establish a new connection
- # to the server before running the worker loop. This means you get a separate
- # agent run and RPM sees it as a separate instance (default is false).
- def connect(options)
- # Don't proceed if we already connected (@connected=true) or if we tried
- # to connect and were rejected with prejudice because of a license issue
- # (@connected=false).
- return if !@connected.nil? && !options[:force_reconnect]
-
- keep_retrying = options[:keep_retrying].nil? || options[:keep_retrying]
-
- # wait a few seconds for the web server to boot, necessary in development
- connect_retry_period = keep_retrying ? 10 : 0
- connect_attempts = 0
- @agent_id = nil
- begin
- sleep connect_retry_period.to_i
- environment = control['send_environment_info'] != false ? control.local_env.snapshot : []
- log.debug "Connecting with validation seed/token: #{control.validate_seed}/#{control.validate_token}" if control.validate_seed
- @agent_id ||= invoke_remote :start, @local_host, {
- :pid => $$,
- :launch_time => @launch_time.to_f,
- :agent_version => NewRelic::VERSION::STRING,
- :environment => environment,
- :settings => control.settings,
- :validate_seed => control.validate_seed,
- :validate_token => control.validate_token }
-
- host = invoke_remote(:get_redirect_host)
-
- @collector = control.server_from_host(host) if host
-
- @report_period = invoke_remote :get_data_report_period, @agent_id
-
- control.log! "Connected to NewRelic Service at #{@collector}"
- log.debug "Agent ID = #{@agent_id}."
-
- # Ask the server for permission to send transaction samples.
- # determined by subscription license.
- @should_send_samples &&= invoke_remote :should_collect_samples, @agent_id
-
- if @should_send_samples
- sampling_rate = invoke_remote :sampling_rate, @agent_id if @random_sample
- @transaction_sampler.sampling_rate = sampling_rate
- log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}" if sampling_rate
- log.info "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds."
end
+
+ def determine_host
+ Socket.gethostname
+ end
+
+ def determine_home_directory
+ control.root
+ end
- # Ask for permission to collect error data
- error_collector.enabled &&= invoke_remote(:should_collect_errors, @agent_id)
-
- log.info "Transaction traces will be sent to the RPM service." if @should_send_samples
- log.info "Errors will be sent to the RPM service." if error_collector.enabled
-
- @connected_pid = $$
- @connected = true
-
- rescue NewRelic::Agent::LicenseException => e
- control.log! e.message, :error
- control.log! "Visit NewRelic.com to obtain a valid license key, or to upgrade your account."
- @connected = false
-
- rescue Timeout::Error, StandardError => e
- log.info "Unable to establish connection with New Relic RPM Service at #{control.server}"
- unless e.instance_of? NewRelic::Agent::ServerConnectionException
- log.error e.message
- log.debug e.backtrace.join("\n")
+ def is_application_spawner?
+ $0 =~ /ApplicationSpawner|^unicorn\S* master/
end
- # retry logic
- if keep_retrying
- connect_attempts += 1
- case connect_attempts
- when 1..2
- connect_retry_period, period_msg = 60, "1 minute"
- when 3..5
- connect_retry_period, period_msg = 60 * 2, "2 minutes"
- else
- connect_retry_period, period_msg = 5 * 60, "5 minutes"
+
+ def harvest_and_send_timeslice_data
+
+ NewRelic::Agent::BusyCalculator.harvest_busy
+
+ now = Time.now
+
+ @unsent_timeslice_data ||= {}
+ @unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids)
+
+ begin
+ # In this version of the protocol, we get back an assoc array of spec to id.
+ metric_ids = invoke_remote(:metric_data, @agent_id,
+ @last_harvest_time.to_f,
+ now.to_f,
+ @unsent_timeslice_data.values)
+
+ rescue Timeout::Error
+ # assume that the data was received. chances are that it was
+ metric_ids = nil
end
- log.info "Will re-attempt in #{period_msg}"
- retry
- else
- @connected = nil
+
+ metric_ids.each do | spec, id |
+ @metric_ids[spec] = id
+ end if metric_ids
+
+ log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
+
+ # if we successfully invoked this web service, then clear the unsent message cache.
+ @unsent_timeslice_data = {}
+ @last_harvest_time = now
+
+ # handle_messages
+
+ # note - exceptions are logged in invoke_remote. If an exception is encountered here,
+ # then the metric data is downsampled for another timeslices
end
- end
- end
-
- def determine_host
- Socket.gethostname
- end
-
- def determine_home_directory
- control.root
- end
-
- def harvest_and_send_timeslice_data
-
- NewRelic::Agent::BusyCalculator.harvest_busy
-
- now = Time.now
-
- @unsent_timeslice_data ||= {}
- @unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids)
-
- begin
- # In this version of the protocol, we get back an assoc array of spec to id.
- metric_ids = invoke_remote(:metric_data, @agent_id,
- @last_harvest_time.to_f,
- now.to_f,
- @unsent_timeslice_data.values)
-
- rescue Timeout::Error
- # assume that the data was received. chances are that it was
- metric_ids = nil
- end
-
- metric_ids.each do | spec, id |
- @metric_ids[spec] = id
- end if metric_ids
-
- log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds"
-
- # if we successfully invoked this web service, then clear the unsent message cache.
- @unsent_timeslice_data = {}
- @last_harvest_time = now
-
- # handle_messages
-
- # note - exceptions are logged in invoke_remote. If an exception is encountered here,
- # then the metric data is downsampled for another timeslices
- end
-
- def harvest_and_send_slowest_sample
- @traces = @transaction_sampler.harvest(@traces, @slowest_transaction_threshold)
-
- unless @traces.empty?
- now = Time.now
- log.debug "Sending (#{@traces.length}) transaction traces"
- begin
- # take the traces and prepare them for sending across the
- # wire. This includes gathering SQL explanations, stripping
- # out stack traces, and normalizing SQL. note that we
- # explain only the sql statements whose segments' execution
- # times exceed our threshold (to avoid unnecessary overhead
- # of running explains on fast queries.)
- traces = @traces.collect {|trace| trace.prepare_to_send(:explain_sql => @explain_threshold, :record_sql => @record_sql, :keep_backtraces => true, :explain_enabled => @explain_enabled)}
- invoke_remote :transaction_sample_data, @agent_id, traces
- rescue PostTooBigException
- # we tried to send too much data, drop the first trace and
- # try again
- retry if @traces.shift
+
+ def harvest_and_send_slowest_sample
+ @traces = @transaction_sampler.harvest(@traces, @slowest_transaction_threshold)
+
+ unless @traces.empty?
+ now = Time.now
+ log.debug "Sending (#{@traces.length}) transaction traces"
+ begin
+ # take the traces and prepare them for sending across the
+ # wire. This includes gathering SQL explanations, stripping
+ # out stack traces, and normalizing SQL. note that we
+ # explain only the sql statements whose segments' execution
+ # times exceed our threshold (to avoid unnecessary overhead
+ # of running explains on fast queries.)
+ options = { :keep_backtraces => true }
+ options[:record_sql] = @record_sql unless @record_sql == :off
+ options[:explain_sql] = @explain_threshold if @explain_enabled
+ traces = @traces.collect {|trace| trace.prepare_to_send(options)}
+ invoke_remote :transaction_sample_data, @agent_id, traces
+ rescue PostTooBigException
+ # we tried to send too much data, drop the first trace and
+ # try again
+ retry if @traces.shift
+ end
+
+ log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds"
+ end
+
+ # if we succeed sending this sample, then we don't need to keep
+ # the slowest sample around - it has been sent already and we
+ # can collect the next one
+ @traces = nil
+
+ # note - exceptions are logged in invoke_remote. If an
+ # exception is encountered here, then the slowest sample of is
+ # determined of the entire period since the last reported
+ # sample.
end
-
- log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds"
- end
-
- # if we succeed sending this sample, then we don't need to keep
- # the slowest sample around - it has been sent already and we
- # can collect the next one
- @traces = nil
-
- # note - exceptions are logged in invoke_remote. If an
- # exception is encountered here, then the slowest sample of is
- # determined of the entire period since the last reported
- # sample.
- end
-
- def harvest_and_send_errors
- @unsent_errors = @error_collector.harvest_errors(@unsent_errors)
- if @unsent_errors && @unsent_errors.length > 0
- log.debug "Sending #{@unsent_errors.length} errors"
- begin
- invoke_remote :error_data, @agent_id, @unsent_errors
- rescue PostTooBigException
- @unsent_errors.shift
- retry
+
+ def harvest_and_send_errors
+ @unsent_errors = @error_collector.harvest_errors(@unsent_errors)
+ if @unsent_errors && @unsent_errors.length > 0
+ log.debug "Sending #{@unsent_errors.length} errors"
+ begin
+ invoke_remote :error_data, @agent_id, @unsent_errors
+ rescue PostTooBigException
+ @unsent_errors.shift
+ retry
+ end
+ # if the remote invocation fails, then we never clear
+ # @unsent_errors, and therefore we can re-attempt to send on
+ # the next heartbeat. Note the error collector maxes out at
+ # 20 instances to prevent leakage
+ @unsent_errors = []
+ end
end
- # if the remote invocation fails, then we never clear
- # @unsent_errors, and therefore we can re-attempt to send on
- # the next heartbeat. Note the error collector maxes out at
- # 20 instances to prevent leakage
- @unsent_errors = []
- end
- end
- def compress_data(object)
- dump = Marshal.dump(object)
-
- # this checks to make sure mongrel won't choke on big uploads
- check_post_size(dump)
-
- # we currently optimize for CPU here since we get roughly a 10x
- # reduction in message size with this, and CPU overhead is at a
- # premium. For extra-large posts, we use the higher compression
- # since otherwise it actually errors out.
-
- dump_size = dump.size
-
- # small payloads don't need compression
- return [dump, 'identity'] if dump_size < (64*1024)
-
- # medium payloads get fast compression, to save CPU
- # big payloads get all the compression possible, to stay under
- # the 2,000,000 byte post threshold
- compression = dump_size < 2000000 ? Zlib::BEST_SPEED : Zlib::BEST_COMPRESSION
-
- [Zlib::Deflate.deflate(dump, compression), 'deflate']
- end
-
- def check_post_size(post_string)
- # TODO: define this as a config option on the server side
- return if post_string.size < control.post_size_limit
- log.warn "Tried to send too much data: #{post_string.size} bytes"
- raise PostTooBigException
- end
+ def compress_data(object)
+ dump = Marshal.dump(object)
- def send_request(opts)
- request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name)
- request.content_type = "application/octet-stream"
- request.body = opts[:data]
-
- log.debug "Connect to #{opts[:collector]}#{opts[:uri]}"
-
- response = nil
- http = control.http_connection(collector)
- begin
- timeout(@request_timeout) do
- response = http.request(request)
+ # this checks to make sure mongrel won't choke on big uploads
+ check_post_size(dump)
+
+ # we currently optimize for CPU here since we get roughly a 10x
+ # reduction in message size with this, and CPU overhead is at a
+ # premium. For extra-large posts, we use the higher compression
+ # since otherwise it actually errors out.
+
+ dump_size = dump.size
+
+ # Compress if content is smaller than 64kb. There are problems
+ # with bugs in Ruby in some versions that expose us to a risk of
+ # segfaults if we compress aggressively.
+ return [dump, 'identity'] if dump_size < (64*1024)
+
+ # medium payloads get fast compression, to save CPU
+ # big payloads get all the compression possible, to stay under
+ # the 2,000,000 byte post threshold
+ compression = dump_size < 2000000 ? Zlib::BEST_SPEED : Zlib::BEST_COMPRESSION
+
+ [Zlib::Deflate.deflate(dump, compression), 'deflate']
end
- rescue Timeout::Error
- log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30
- raise
- end
- if response.is_a? Net::HTTPServiceUnavailable
- raise NewRelic::Agent::ServerConnectionException, "Service unavailable: #{response.body || response.message}"
- elsif response.is_a? Net::HTTPGatewayTimeOut
- log.debug("Timed out getting response: #{response.message}")
- raise Timeout::Error, response.message
- elsif !(response.is_a? Net::HTTPSuccess)
- raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server: #{response.code}: #{response.message}"
- end
- response
- end
- def decompress_response(response)
- if response['content-encoding'] != 'gzip'
- log.debug "Uncompressed content returned"
- return response.body
- end
- log.debug "Decompressing return value"
- i = Zlib::GzipReader.new(StringIO.new(response.body))
- i.read
- end
+ def check_post_size(post_string)
+ # TODO: define this as a config option on the server side
+ return if post_string.size < control.post_size_limit
+ log.warn "Tried to send too much data: #{post_string.size} bytes"
+ raise PostTooBigException
+ end
- def check_for_exception(response)
- dump = decompress_response(response)
- value = Marshal.load(dump)
- raise value if value.is_a? Exception
- value
- end
-
- def remote_method_uri(method)
- uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}"
- uri << "?run_id=#{@agent_id}" if @agent_id
- uri
- end
-
- # send a message via post
- def invoke_remote(method, *args)
- #determines whether to zip the data or send plain
- post_data, encoding = compress_data(args)
-
- response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data})
+ def send_request(opts)
+ request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name)
+ request.content_type = "application/octet-stream"
+ request.body = opts[:data]
- # raises the right exception if the remote server tells it to die
- return check_for_exception(response)
- rescue NewRelic::Agent::ForceRestartException => e
- log.info e.message
- raise
- rescue SystemCallError, SocketError => e
- # These include Errno connection errors
- raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}"
- end
-
- def graceful_disconnect
- if @connected
- begin
- log.debug "Sending graceful shutdown message to #{control.server}"
-
- @request_timeout = 10
- log.debug "Flushing unsent metric data to server"
- @worker_loop.run_task
- if @connected_pid == $$
- log.debug "Sending RPM service agent run shutdown message"
- invoke_remote :shutdown, @agent_id, Time.now.to_f
+ log.debug "Connect to #{opts[:collector]}#{opts[:uri]}"
+
+ response = nil
+ http = control.http_connection(collector)
+ http.read_timeout = nil
+ begin
+ NewRelic::TimerLib.timeout(@request_timeout) do
+ response = http.request(request)
+ end
+ rescue Timeout::Error
+ log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30
+ raise
+ end
+ if response.is_a? Net::HTTPServiceUnavailable
+ raise NewRelic::Agent::ServerConnectionException, "Service unavailable (#{response.code}): #{response.message}"
+ elsif response.is_a? Net::HTTPGatewayTimeOut
+ log.debug("Timed out getting response: #{response.message}")
+ raise Timeout::Error, response.message
+ elsif response.is_a? Net::HTTPRequestEntityTooLarge
+ raise PostTooBigException
+ elsif !(response.is_a? Net::HTTPSuccess)
+ raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server (#{response.code}): #{response.message}"
+ end
+ response
+ end
+
+ def decompress_response(response)
+ if response['content-encoding'] != 'gzip'
+ log.debug "Uncompressed content returned"
+ return response.body
+ end
+ log.debug "Decompressing return value"
+ i = Zlib::GzipReader.new(StringIO.new(response.body))
+ i.read
+ end
+
+ def check_for_exception(response)
+ dump = decompress_response(response)
+ value = Marshal.load(dump)
+ raise value if value.is_a? Exception
+ value
+ end
+
+ def remote_method_uri(method)
+ uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}"
+ uri << "?run_id=#{@agent_id}" if @agent_id
+ uri
+ end
+
+ # send a message via post
+ def invoke_remote(method, *args)
+ #determines whether to zip the data or send plain
+ post_data, encoding = compress_data(args)
+
+ response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data})
+
+ # raises the right exception if the remote server tells it to die
+ return check_for_exception(response)
+ rescue NewRelic::Agent::ForceRestartException => e
+ log.info e.message
+ raise
+ rescue SystemCallError, SocketError => e
+ # These include Errno connection errors
+ raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}"
+ end
+
+ def graceful_disconnect
+ if @connected
+ begin
+ @request_timeout = 10
+ log.debug "Flushing unsent metric data to server"
+ @worker_loop.run_task
+ if @connected_pid == $$
+ log.debug "Sending RPM service agent run shutdown message"
+ invoke_remote :shutdown, @agent_id, Time.now.to_f
+ else
+ log.debug "This agent connected from parent process #{@connected_pid}--not sending shutdown"
+ end
+ log.debug "Graceful disconnect complete"
+ rescue Timeout::Error, StandardError
+ end
else
- log.debug "This agent connected from #{@connected_pid}--not sending shutdown"
+ log.debug "Bypassing graceful disconnect - agent not connected"
end
- log.debug "Graceful disconnect complete"
- rescue Timeout::Error, StandardError
end
- else
- log.debug "Bypassing graceful disconnect - agent not connected"
+ def default_sql_obfuscator(sql)
+ sql = sql.dup
+ # This is hardly readable. Use the unit tests.
+ # remove single quoted strings:
+ sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?')
+ # remove double quoted strings:
+ sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?')
+ # replace all number literals
+ sql.gsub!(/\d+/, "?")
+ sql
+ end
end
+
+ extend ClassMethods
+ include InstanceMethods
end
- def default_sql_obfuscator(sql)
- sql = sql.dup
- # This is hardly readable. Use the unit tests.
- # remove single quoted strings:
- sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?')
- # remove double quoted strings:
- sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?')
- # replace all number literals
- sql.gsub!(/\d+/, "?")
- sql
- end
end
-
-end
end