lib/new_relic/agent/agent.rb in newrelic_rpm-2.12.3 vs lib/new_relic/agent/agent.rb in newrelic_rpm-2.13.0.beta3

- old
+ new

@@ -1,668 +1,736 @@ require 'socket' -require 'net/https' +require 'net/https' require 'net/http' require 'logger' require 'zlib' require 'stringio' -# The NewRelic Agent collects performance data from ruby applications -# in realtime as the application runs, and periodically sends that -# data to the NewRelic server. module NewRelic module Agent - - # The Agent is a singleton that is instantiated when the plugin is - # activated. - class Agent - - # Specifies the version of the agent's communication protocol with - # the NewRelic hosted site. - - PROTOCOL_VERSION = 8 - # 14105: v8 (tag 2.10.3) - # (no v7) - # 10379: v6 (not tagged) - # 4078: v5 (tag 2.5.4) - # 2292: v4 (tag 2.3.6) - # 1754: v3 (tag 2.3.0) - # 534: v2 (shows up in 2.1.0, our first tag) - - - attr_reader :obfuscator - attr_reader :stats_engine - attr_reader :transaction_sampler - attr_reader :error_collector - attr_reader :record_sql - attr_reader :histogram - attr_reader :metric_ids - - # Should only be called by NewRelic::Control - def self.instance - @instance ||= self.new - end - # This method is deprecated. Use NewRelic::Agent.manual_start - def manual_start(ignored=nil, also_ignored=nil) - raise "This method no longer supported. Instead use the class method NewRelic::Agent.manual_start" - end - - # This method should be called in a forked process after a fork. - # It assumes the parent process initialized the agent, but does - # not assume the agent started. - # - # * It clears any metrics carried over from the parent process - # * Restarts the sampler thread if necessary - # * Initiates a new agent run and worker loop unless that was done - # in the parent process and +:force_reconnect+ is not true - # - # Options: - # * <tt>:force_reconnect => true</tt> to force the spawned process to - # establish a new connection, such as when forking a long running process. - # The default is false--it will only connect to the server if the parent - # had not connected. - # * <tt>:keep_retrying => false</tt> if we try to initiate a new - # connection, this tells me to only try it once so this method returns - # quickly if there is some kind of latency with the server. - def after_fork(options={}) - - # @connected gets false after we fail to connect or have an error - # connecting. @connected has nil if we haven't finished trying to connect. - # or we didn't attempt a connection because this is the master process - log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]" - return if !control.agent_enabled? or - !control.monitor_mode? or - @connected == false or - @worker_thread && @worker_thread.alive? + # The Agent is a singleton that is instantiated when the plugin is + # activated. It collects performance data from ruby applications + # in realtime as the application runs, and periodically sends that + # data to the NewRelic server. + class Agent - log.info "Starting the worker thread in #$$ after forking." + # Specifies the version of the agent's communication protocol with + # the NewRelic hosted site. - # Clear out stats that are left over from parent process - reset_stats - - start_worker_thread(options) - @stats_engine.start_sampler_thread - end - - # True if we have initialized and completed 'start' - def started? - @started - end - - # Attempt a graceful shutdown of the agent. - def shutdown - return if not started? - if @worker_loop - @worker_loop.stop + PROTOCOL_VERSION = 8 + # 14105: v8 (tag 2.10.3) + # (no v7) + # 10379: v6 (not tagged) + # 4078: v5 (tag 2.5.4) + # 2292: v4 (tag 2.3.6) + # 1754: v3 (tag 2.3.0) + # 534: v2 (shows up in 2.1.0, our first tag) + + + def initialize - log.debug "Starting Agent shutdown" - - # if litespeed, then ignore all future SIGUSR1 - it's - # litespeed trying to shut us down - - if control.dispatcher == :litespeed - Signal.trap("SIGUSR1", "IGNORE") - Signal.trap("SIGTERM", "IGNORE") - end - - begin - graceful_disconnect - rescue => e - log.error e - log.error e.backtrace.join("\n") - end - end - @started = nil - end - - def start_transaction - @stats_engine.start_transaction - end - - def end_transaction - @stats_engine.end_transaction - end - - def set_record_sql(should_record) - prev = Thread::current[:record_sql] - Thread::current[:record_sql] = should_record - prev.nil? || prev - end - - def set_record_tt(should_record) - prev = Thread::current[:record_tt] - Thread::current[:record_tt] = should_record - prev.nil? || prev - end - # Push flag indicating whether we should be tracing in this - # thread. - def push_trace_execution_flag(should_trace=false) - (Thread.current[:newrelic_untraced] ||= []) << should_trace - end + @launch_time = Time.now - # Pop the current trace execution status. Restore trace execution status - # to what it was before we pushed the current flag. - def pop_trace_execution_flag - Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced] - end - - def set_sql_obfuscator(type, &block) - if type == :before - @obfuscator = NewRelic::ChainedCall.new(block, @obfuscator) - elsif type == :after - @obfuscator = NewRelic::ChainedCall.new(@obfuscator, block) - elsif type == :replace - @obfuscator = block - else - fail "unknown sql_obfuscator type #{type}" + @metric_ids = {} + @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10) + @stats_engine = NewRelic::Agent::StatsEngine.new + @transaction_sampler = NewRelic::Agent::TransactionSampler.new + @stats_engine.transaction_sampler = @transaction_sampler + @error_collector = NewRelic::Agent::ErrorCollector.new + + @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60) + + @last_harvest_time = Time.now + @obfuscator = method(:default_sql_obfuscator) end - end - def log - NewRelic::Agent.logger - end - - # Start up the agent. This verifies that the agent_enabled? is - # true and initializes the sampler based on the current - # configuration settings. Then it will fire up the background - # thread for sending data to the server if applicable. - def start - if started? - control.log! "Agent Started Already!", :error - return + module ClassMethods + # Should only be called by NewRelic::Control + def instance + @instance ||= self.new + end end - return if !control.agent_enabled? - @started = true - @local_host = determine_host - - if control.dispatcher.nil? || control.dispatcher.to_s.empty? - log.info "No dispatcher detected." - else - log.info "Dispatcher: #{control.dispatcher.to_s}" - end - log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty? - - sampler_config = control.fetch('transaction_tracer', {}) - @should_send_samples = sampler_config.fetch('enabled', true) - log.info "Transaction tracing not enabled." if not @should_send_samples - - @record_sql = sampler_config.fetch('record_sql', :obfuscated).to_sym - - # use transaction_threshold: 4.0 to force the TT collection - # threshold to 4 seconds - # use transaction_threshold: apdex_f to use your apdex t value - # multiplied by 4 - # undefined transaction_threshold defaults to 2.0 - apdex_f = 4 * NewRelic::Control.instance.apdex_t - @slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0) - if @slowest_transaction_threshold =~ /apdex_f/i - @slowest_transaction_threshold = apdex_f - end - @slowest_transaction_threshold = @slowest_transaction_threshold.to_f - - @explain_threshold = sampler_config.fetch('explain_threshold', 0.5).to_f - @explain_enabled = sampler_config.fetch('explain_enabled', true) - @random_sample = sampler_config.fetch('random_sample', false) - log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw - # Initialize transaction sampler - @transaction_sampler.random_sampling = @random_sample - case - when !control.monitor_mode? - log.warn "Agent configured not to send data in this environment - edit newrelic.yml to change this" - when !control.license_key - log.error "No license key found. Please edit your newrelic.yml file and insert your license key." - when control.license_key.length != 40 - log.error "Invalid license key: #{control.license_key}" - when [:passenger, :unicorn].include?(control.dispatcher) - log.info "Connecting workers after forking." - else - # Do the connect in the foreground if we are in sync mode - NewRelic::Agent.disable_all_tracing { connect(:keep_retrying => false) } if control.sync_startup + module InstanceMethods + + attr_reader :obfuscator + attr_reader :stats_engine + attr_reader :transaction_sampler + attr_reader :error_collector + attr_reader :record_sql + attr_reader :histogram + attr_reader :metric_ids + attr_reader :url_rules + + def record_transaction(duration_seconds, options={}) + is_error = options['is_error'] || options['error_message'] || options['exception'] + metric = options['metric'] + metric ||= options['uri'] # normalize this with url rules + raise "metric or uri arguments required" unless metric + metric_info = NewRelic::MetricParser.for_metric_named(metric) + + if metric_info.is_web_transaction? + NewRelic::Agent::Instrumentation::MetricFrame.record_apdex(metric_info, duration_seconds, duration_seconds, is_error) + histogram.process(duration_seconds) + end + metrics = metric_info.summary_metrics + + metrics << metric + metrics.each do |name| + stats = stats_engine.get_stats_no_scope(name) + stats.record_data_point(duration_seconds) + end + + if is_error + if error_message + e = Exception.new error_message if error_message + error_collector.notice_error e, :uri => uri, :metric => uri + end + end + # busy time ? + end + + # This method is deprecated. Use NewRelic::Agent.manual_start + def manual_start(ignored=nil, also_ignored=nil) + raise "This method no longer supported. Instead use the class method NewRelic::Agent.manual_start" + end + + # This method should be called in a forked process after a fork. + # It assumes the parent process initialized the agent, but does + # not assume the agent started. + # + # The call is idempotent, but not re-entrant. + # + # * It clears any metrics carried over from the parent process + # * Restarts the sampler thread if necessary + # * Initiates a new agent run and worker loop unless that was done + # in the parent process and +:force_reconnect+ is not true + # + # Options: + # * <tt>:force_reconnect => true</tt> to force the spawned process to + # establish a new connection, such as when forking a long running process. + # The default is false--it will only connect to the server if the parent + # had not connected. + # * <tt>:keep_retrying => false</tt> if we try to initiate a new + # connection, this tells me to only try it once so this method returns + # quickly if there is some kind of latency with the server. + def after_fork(options={}) + + # @connected gets false after we fail to connect or have an error + # connecting. @connected has nil if we haven't finished trying to connect. + # or we didn't attempt a connection because this is the master process + + # log.debug "Agent received after_fork notice in #$$: [#{control.agent_enabled?}; monitor=#{control.monitor_mode?}; connected: #{@connected.inspect}; thread=#{@worker_thread.inspect}]" + return if !control.agent_enabled? or + !control.monitor_mode? or + @connected == false or + @worker_thread && @worker_thread.alive? + + log.info "Starting the worker thread in #$$ after forking." + + # Clear out stats that are left over from parent process + reset_stats + + # Don't ever check to see if this is a spawner. If we're in a forked process + # I'm pretty sure we're not also forking new instances. + start_worker_thread(options) + @stats_engine.start_sampler_thread + end + + # True if we have initialized and completed 'start' + def started? + @started + end - # Start the event loop and initiate connection if necessary - start_worker_thread - - # Our shutdown handler needs to run after other shutdown handlers - # that may be doing things like running the app (hello sinatra). - if RUBY_VERSION =~ /rubinius/i - list = at_exit { shutdown } - # move the shutdown handler to the front of the list, to - # execute last: - list.unshift(list.pop) - elsif !defined?(JRuby) or !defined?(Sinatra::Application) - at_exit { at_exit { shutdown } } + # Return nil if not yet connected, true if successfully started + # and false if we failed to start. + def connected? + @connected end - end - control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #$$" - control.log! "Agent Log found in #{NewRelic::Control.instance.log_file}" if NewRelic::Control.instance.log_file - end - - # Clear out the metric data, errors, and transaction traces. Reset the histogram data. - def reset_stats - @stats_engine.reset_stats - @unsent_errors = [] - @traces = nil - @unsent_timeslice_data = {} - @last_harvest_time = Time.now - @launch_time = Time.now - @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10) - end - private - def collector - @collector ||= control.server - end - - # Try to launch the worker thread and connect to the server. - # - # See #connect for a description of connection_options. - def start_worker_thread(connection_options = {}) - log.debug "Creating RPM worker thread." - @worker_thread = Thread.new do - begin - NewRelic::Agent.disable_all_tracing do - # We try to connect. If this returns false that means - # the server rejected us for a licensing reason and we should - # just exit the thread. If it returns nil - # that means it didn't try to connect because we're in the master. - connect(connection_options) - if @connected - # disable transaction sampling if disabled by the server and we're not in dev mode - if !control.developer_mode? && !@should_send_samples - @transaction_sampler.disable + # Attempt a graceful shutdown of the agent. + def shutdown + return if not started? + if @worker_loop + @worker_loop.stop + + log.debug "Starting Agent shutdown" + + # if litespeed, then ignore all future SIGUSR1 - it's + # litespeed trying to shut us down + + if control.dispatcher == :litespeed + Signal.trap("SIGUSR1", "IGNORE") + Signal.trap("SIGTERM", "IGNORE") + end + + begin + NewRelic::Agent.disable_all_tracing do + graceful_disconnect end - log.info "Reporting performance data every #{@report_period} seconds." - log.debug "Running worker loop" - # note if the agent attempts to report more frequently than allowed by the server - # the server will start dropping data. - @worker_loop = WorkerLoop.new - @worker_loop.run(@report_period) do - harvest_and_send_timeslice_data - harvest_and_send_slowest_sample if @should_send_samples - harvest_and_send_errors if error_collector.enabled + rescue => e + log.error e + log.error e.backtrace.join("\n") + end + end + @started = nil + end + + def start_transaction + @stats_engine.start_transaction + end + + def end_transaction + @stats_engine.end_transaction + end + + def set_record_sql(should_record) + prev = Thread::current[:record_sql] + Thread::current[:record_sql] = should_record + prev.nil? || prev + end + + def set_record_tt(should_record) + prev = Thread::current[:record_tt] + Thread::current[:record_tt] = should_record + prev.nil? || prev + end + # Push flag indicating whether we should be tracing in this + # thread. + def push_trace_execution_flag(should_trace=false) + (Thread.current[:newrelic_untraced] ||= []) << should_trace + end + + # Pop the current trace execution status. Restore trace execution status + # to what it was before we pushed the current flag. + def pop_trace_execution_flag + Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced] + end + + def set_sql_obfuscator(type, &block) + if type == :before + @obfuscator = NewRelic::ChainedCall.new(block, @obfuscator) + elsif type == :after + @obfuscator = NewRelic::ChainedCall.new(@obfuscator, block) + elsif type == :replace + @obfuscator = block + else + fail "unknown sql_obfuscator type #{type}" + end + end + + def log + NewRelic::Agent.logger + end + + # Start up the agent. This verifies that the agent_enabled? is + # true and initializes the sampler based on the current + # configuration settings. Then it will fire up the background + # thread for sending data to the server if applicable. + def start + if started? + control.log! "Agent Started Already!", :error + return + end + return if !control.agent_enabled? + @started = true + @local_host = determine_host + + if control.dispatcher.nil? || control.dispatcher.to_s.empty? + log.info "No dispatcher detected." + else + log.info "Dispatcher: #{control.dispatcher.to_s}" + end + log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty? + + sampler_config = control.fetch('transaction_tracer', {}) + # TODO: Should move this state into the transaction sampler instance + @should_send_samples = @config_should_send_samples = sampler_config.fetch('enabled', true) + @should_send_random_samples = sampler_config.fetch('random_sample', false) + @explain_threshold = sampler_config.fetch('explain_threshold', 0.5).to_f + @explain_enabled = sampler_config.fetch('explain_enabled', true) + @record_sql = sampler_config.fetch('record_sql', :obfuscated).to_sym + + # use transaction_threshold: 4.0 to force the TT collection + # threshold to 4 seconds + # use transaction_threshold: apdex_f to use your apdex t value + # multiplied by 4 + # undefined transaction_threshold defaults to 2.0 + apdex_f = 4 * NewRelic::Control.instance.apdex_t + @slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0) + if @slowest_transaction_threshold =~ /apdex_f/i + @slowest_transaction_threshold = apdex_f + end + @slowest_transaction_threshold = @slowest_transaction_threshold.to_f + + log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw + + case + when !control.monitor_mode? + log.warn "Agent configured not to send data in this environment - edit newrelic.yml to change this" + when !control.license_key + log.error "No license key found. Please edit your newrelic.yml file and insert your license key." + when control.license_key.length != 40 + log.error "Invalid license key: #{control.license_key}" + when [:passenger, :unicorn].include?(control.dispatcher) + log.info "Connecting workers after forking." + else + # Do the connect in the foreground if we are in sync mode + NewRelic::Agent.disable_all_tracing { connect(:keep_retrying => false) } if control.sync_startup + + # Start the event loop and initiate connection if necessary + start_worker_thread + + # Our shutdown handler needs to run after other shutdown handlers + # that may be doing things like running the app (hello sinatra). + if control.send_data_on_exit + if RUBY_VERSION =~ /rubinius/i + list = at_exit { shutdown } + # move the shutdown handler to the front of the list, to + # execute last: + list.unshift(list.pop) + elsif !defined?(JRuby) or !defined?(Sinatra::Application) + at_exit { at_exit { shutdown } } end + end + end + log.info "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #$$" + log.info "Agent Log found in #{NewRelic::Control.instance.log_file}" if NewRelic::Control.instance.log_file + end + + # Clear out the metric data, errors, and transaction traces. Reset the histogram data. + def reset_stats + @stats_engine.reset_stats + @unsent_errors = [] + @traces = nil + @unsent_timeslice_data = {} + @last_harvest_time = Time.now + @launch_time = Time.now + @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10) + end + + private + def collector + @collector ||= control.server + end + + # Try to launch the worker thread and connect to the server. + # + # See #connect for a description of connection_options. + def start_worker_thread(connection_options = {}) + log.debug "Creating RPM worker thread." + @worker_thread = Thread.new do + begin + NewRelic::Agent.disable_all_tracing do + # We try to connect. If this returns false that means + # the server rejected us for a licensing reason and we should + # just exit the thread. If it returns nil + # that means it didn't try to connect because we're in the master. + connect(connection_options) + if @connected + # disable transaction sampling if disabled by the server and we're not in dev mode + if !control.developer_mode? && !@should_send_samples + @transaction_sampler.disable + else + @transaction_sampler.enable # otherwise ensure TT's are enabled + end + log.info "Reporting performance data every #{@report_period} seconds." + log.debug "Running worker loop" + # Note if the agent attempts to report more frequently than allowed by the server + # the server will start dropping data. + @worker_loop = WorkerLoop.new + @worker_loop.run(@report_period) do + harvest_and_send_timeslice_data + harvest_and_send_slowest_sample if @should_send_samples + harvest_and_send_errors if error_collector.enabled + end + else + log.debug "No connection. Worker thread finished." + end + end + rescue NewRelic::Agent::ForceRestartException => e + log.info e.message + # disconnect and start over. + # clear the stats engine + reset_stats + @metric_ids = {} + @connected = nil + # Wait a short time before trying to reconnect + sleep 30 + retry + rescue NewRelic::Agent::ForceDisconnectException => e + # when a disconnect is requested, stop the current thread, which + # is the worker thread that gathers data and talks to the + # server. + log.error "RPM forced this agent to disconnect (#{e.message})" + @connected = false + rescue NewRelic::Agent::ServerConnectionException => e + log.error "Unable to establish connection with the server. Run with log level set to debug for more information." + log.debug("#{e.class.name}: #{e.message}\n#{e.backtrace.first}") + @connected = false + rescue Exception => e + log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}" + @connected = false + end # begin + end # thread new + @worker_thread['newrelic_label'] = 'Worker Loop' + end + + def control + NewRelic::Control.instance + end + + # Connect to the server and validate the license. If successful, + # @connected has true when finished. If not successful, you can + # keep calling this. Return false if we could not establish a + # connection with the server and we should not retry, such as if + # there's a bad license key. + # + # Set keep_retrying=false to disable retrying and return asap, such as when + # invoked in the foreground. Otherwise this runs until a successful + # connection is made, or the server rejects us. + # + # * <tt>:keep_retrying => false</tt> to only try to connect once, and + # return with the connection set to nil. This ensures we may try again + # later (default true). + # * <tt>force_reconnect => true</tt> if you want to establish a new connection + # to the server before running the worker loop. This means you get a separate + # agent run and RPM sees it as a separate instance (default is false). + def connect(options) + # Don't proceed if we already connected (@connected=true) or if we tried + # to connect and were rejected with prejudice because of a license issue + # (@connected=false). + return if !@connected.nil? && !options[:force_reconnect] + keep_retrying = options[:keep_retrying].nil? || options[:keep_retrying] + + # wait a few seconds for the web server to boot, necessary in development + connect_retry_period = keep_retrying ? 10 : 0 + connect_attempts = 0 + @agent_id = nil + begin + sleep connect_retry_period.to_i + log.debug "Connecting Process to RPM: #$0" + host = invoke_remote(:get_redirect_host) + @collector = control.server_from_host(host) if host + environment = control['send_environment_info'] != false ? control.local_env.snapshot : [] + log.debug "Connecting with validation seed/token: #{control.validate_seed}/#{control.validate_token}" if control.validate_seed + connect_data = invoke_remote :connect, + :pid => $$, + :host => @local_host, + :app_name => control.app_names, + :language => 'ruby', + :agent_version => NewRelic::VERSION::STRING, + :environment => environment, + :settings => control.settings, + :validate => {:seed => control.validate_seed, + :token => control.validate_token } + + @agent_id = connect_data['agent_run_id'] + @report_period = connect_data['data_report_period'] + @url_rules = connect_data['url_rules'] + + control.log! "Connected to NewRelic Service at #{@collector}" + log.debug "Agent Run = #{@agent_id}." + log.debug "Connection data = #{connect_data.inspect}" + + # Ask the server for permission to send transaction samples. + # determined by subscription license. + @should_send_samples = @config_should_send_samples && connect_data['collect_traces'] + + if @should_send_samples + if @should_send_random_samples + @transaction_sampler.random_sampling = true + @transaction_sampler.sampling_rate = connect_data['sampling_rate'] + log.info "Transaction sampling enabled, rate = #{@transaction_sampler.sampling_rate}" + end + log.debug "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds." else - log.debug "No connection. Worker thread finished." + log.debug "Transaction traces will not be sent to the RPM service." end + + # Ask for permission to collect error data + error_collector.enabled = error_collector.config_enabled && connect_data['collect_errors'] + + log.debug "Errors will be sent to the RPM service." if error_collector.enabled + + @connected_pid = $$ + @connected = true + + rescue NewRelic::Agent::LicenseException => e + log.error e.message + log.info "Visit NewRelic.com to obtain a valid license key, or to upgrade your account." + @connected = false + + rescue Timeout::Error, StandardError => e + if e.instance_of? NewRelic::Agent::ServerConnectionException + log.info "Unable to establish connection with New Relic RPM Service at #{control.server}: #{e.message}" + log.debug e.backtrace.join("\n") + else + log.error "Error establishing connection with New Relic RPM Service at #{control.server}: #{e.message}" + log.debug e.backtrace.join("\n") + end + # retry logic + if keep_retrying + connect_attempts += 1 + case connect_attempts + when 1..2 + connect_retry_period, period_msg = 60, "1 minute" + when 3..5 + connect_retry_period, period_msg = 60 * 2, "2 minutes" + else + connect_retry_period, period_msg = 5 * 60, "5 minutes" + end + log.info "Will re-attempt in #{period_msg}" + retry + else + @connected = nil + end end - rescue NewRelic::Agent::ForceRestartException => e - log.info e.message - # disconnect and start over. - # clear the stats engine - reset_stats - @metric_ids = {} - @connected = nil - # Wait a short time before trying to reconnect - sleep 30 - retry - rescue NewRelic::Agent::ForceDisconnectException => e - # when a disconnect is requested, stop the current thread, which - # is the worker thread that gathers data and talks to the - # server. - log.error "RPM forced this agent to disconnect (#{e.message})" - @connected = false - rescue NewRelic::Agent::ServerConnectionException => e - control.log! "Unable to establish connection with the server. Run with log level set to debug for more information." - log.debug("#{e.class.name}: #{e.message}\n#{e.backtrace.first}") - @connected = false - rescue Exception => e - log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}" - @connected = false - end # begin - end # thread new - @worker_thread['newrelic_label'] = 'Worker Loop' - end - - def control - NewRelic::Control.instance - end - - def initialize - @launch_time = Time.now - - @metric_ids = {} - @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10) - @stats_engine = NewRelic::Agent::StatsEngine.new - @transaction_sampler = NewRelic::Agent::TransactionSampler.new - @stats_engine.transaction_sampler = @transaction_sampler - @error_collector = NewRelic::Agent::ErrorCollector.new - - @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60) - - @last_harvest_time = Time.now - @obfuscator = method(:default_sql_obfuscator) - end - - # Connect to the server and validate the license. If successful, - # @connected has true when finished. If not successful, you can - # keep calling this. Return false if we could not establish a - # connection with the server and we should not retry, such as if - # there's a bad license key. - # - # Set keep_retrying=false to disable retrying and return asap, such as when - # invoked in the foreground. Otherwise this runs until a successful - # connection is made, or the server rejects us. - # - # * <tt>:keep_retrying => false</tt> to only try to connect once, and - # return with the connection set to nil. This ensures we may try again - # later (default true). - # * <tt>force_reconnect => true</tt> if you want to establish a new connection - # to the server before running the worker loop. This means you get a separate - # agent run and RPM sees it as a separate instance (default is false). - def connect(options) - # Don't proceed if we already connected (@connected=true) or if we tried - # to connect and were rejected with prejudice because of a license issue - # (@connected=false). - return if !@connected.nil? && !options[:force_reconnect] - - keep_retrying = options[:keep_retrying].nil? || options[:keep_retrying] - - # wait a few seconds for the web server to boot, necessary in development - connect_retry_period = keep_retrying ? 10 : 0 - connect_attempts = 0 - @agent_id = nil - begin - sleep connect_retry_period.to_i - environment = control['send_environment_info'] != false ? control.local_env.snapshot : [] - log.debug "Connecting with validation seed/token: #{control.validate_seed}/#{control.validate_token}" if control.validate_seed - @agent_id ||= invoke_remote :start, @local_host, { - :pid => $$, - :launch_time => @launch_time.to_f, - :agent_version => NewRelic::VERSION::STRING, - :environment => environment, - :settings => control.settings, - :validate_seed => control.validate_seed, - :validate_token => control.validate_token } - - host = invoke_remote(:get_redirect_host) - - @collector = control.server_from_host(host) if host - - @report_period = invoke_remote :get_data_report_period, @agent_id - - control.log! "Connected to NewRelic Service at #{@collector}" - log.debug "Agent ID = #{@agent_id}." - - # Ask the server for permission to send transaction samples. - # determined by subscription license. - @should_send_samples &&= invoke_remote :should_collect_samples, @agent_id - - if @should_send_samples - sampling_rate = invoke_remote :sampling_rate, @agent_id if @random_sample - @transaction_sampler.sampling_rate = sampling_rate - log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}" if sampling_rate - log.info "Transaction tracing threshold is #{@slowest_transaction_threshold} seconds." end + + def determine_host + Socket.gethostname + end + + def determine_home_directory + control.root + end - # Ask for permission to collect error data - error_collector.enabled &&= invoke_remote(:should_collect_errors, @agent_id) - - log.info "Transaction traces will be sent to the RPM service." if @should_send_samples - log.info "Errors will be sent to the RPM service." if error_collector.enabled - - @connected_pid = $$ - @connected = true - - rescue NewRelic::Agent::LicenseException => e - control.log! e.message, :error - control.log! "Visit NewRelic.com to obtain a valid license key, or to upgrade your account." - @connected = false - - rescue Timeout::Error, StandardError => e - log.info "Unable to establish connection with New Relic RPM Service at #{control.server}" - unless e.instance_of? NewRelic::Agent::ServerConnectionException - log.error e.message - log.debug e.backtrace.join("\n") + def is_application_spawner? + $0 =~ /ApplicationSpawner|^unicorn\S* master/ end - # retry logic - if keep_retrying - connect_attempts += 1 - case connect_attempts - when 1..2 - connect_retry_period, period_msg = 60, "1 minute" - when 3..5 - connect_retry_period, period_msg = 60 * 2, "2 minutes" - else - connect_retry_period, period_msg = 5 * 60, "5 minutes" + + def harvest_and_send_timeslice_data + + NewRelic::Agent::BusyCalculator.harvest_busy + + now = Time.now + + @unsent_timeslice_data ||= {} + @unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids) + + begin + # In this version of the protocol, we get back an assoc array of spec to id. + metric_ids = invoke_remote(:metric_data, @agent_id, + @last_harvest_time.to_f, + now.to_f, + @unsent_timeslice_data.values) + + rescue Timeout::Error + # assume that the data was received. chances are that it was + metric_ids = nil end - log.info "Will re-attempt in #{period_msg}" - retry - else - @connected = nil + + metric_ids.each do | spec, id | + @metric_ids[spec] = id + end if metric_ids + + log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds" + + # if we successfully invoked this web service, then clear the unsent message cache. + @unsent_timeslice_data = {} + @last_harvest_time = now + + # handle_messages + + # note - exceptions are logged in invoke_remote. If an exception is encountered here, + # then the metric data is downsampled for another timeslices end - end - end - - def determine_host - Socket.gethostname - end - - def determine_home_directory - control.root - end - - def harvest_and_send_timeslice_data - - NewRelic::Agent::BusyCalculator.harvest_busy - - now = Time.now - - @unsent_timeslice_data ||= {} - @unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids) - - begin - # In this version of the protocol, we get back an assoc array of spec to id. - metric_ids = invoke_remote(:metric_data, @agent_id, - @last_harvest_time.to_f, - now.to_f, - @unsent_timeslice_data.values) - - rescue Timeout::Error - # assume that the data was received. chances are that it was - metric_ids = nil - end - - metric_ids.each do | spec, id | - @metric_ids[spec] = id - end if metric_ids - - log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds" - - # if we successfully invoked this web service, then clear the unsent message cache. - @unsent_timeslice_data = {} - @last_harvest_time = now - - # handle_messages - - # note - exceptions are logged in invoke_remote. If an exception is encountered here, - # then the metric data is downsampled for another timeslices - end - - def harvest_and_send_slowest_sample - @traces = @transaction_sampler.harvest(@traces, @slowest_transaction_threshold) - - unless @traces.empty? - now = Time.now - log.debug "Sending (#{@traces.length}) transaction traces" - begin - # take the traces and prepare them for sending across the - # wire. This includes gathering SQL explanations, stripping - # out stack traces, and normalizing SQL. note that we - # explain only the sql statements whose segments' execution - # times exceed our threshold (to avoid unnecessary overhead - # of running explains on fast queries.) - traces = @traces.collect {|trace| trace.prepare_to_send(:explain_sql => @explain_threshold, :record_sql => @record_sql, :keep_backtraces => true, :explain_enabled => @explain_enabled)} - invoke_remote :transaction_sample_data, @agent_id, traces - rescue PostTooBigException - # we tried to send too much data, drop the first trace and - # try again - retry if @traces.shift + + def harvest_and_send_slowest_sample + @traces = @transaction_sampler.harvest(@traces, @slowest_transaction_threshold) + + unless @traces.empty? + now = Time.now + log.debug "Sending (#{@traces.length}) transaction traces" + begin + # take the traces and prepare them for sending across the + # wire. This includes gathering SQL explanations, stripping + # out stack traces, and normalizing SQL. note that we + # explain only the sql statements whose segments' execution + # times exceed our threshold (to avoid unnecessary overhead + # of running explains on fast queries.) + options = { :keep_backtraces => true } + options[:record_sql] = @record_sql unless @record_sql == :off + options[:explain_sql] = @explain_threshold if @explain_enabled + traces = @traces.collect {|trace| trace.prepare_to_send(options)} + invoke_remote :transaction_sample_data, @agent_id, traces + rescue PostTooBigException + # we tried to send too much data, drop the first trace and + # try again + retry if @traces.shift + end + + log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds" + end + + # if we succeed sending this sample, then we don't need to keep + # the slowest sample around - it has been sent already and we + # can collect the next one + @traces = nil + + # note - exceptions are logged in invoke_remote. If an + # exception is encountered here, then the slowest sample of is + # determined of the entire period since the last reported + # sample. end - - log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds" - end - - # if we succeed sending this sample, then we don't need to keep - # the slowest sample around - it has been sent already and we - # can collect the next one - @traces = nil - - # note - exceptions are logged in invoke_remote. If an - # exception is encountered here, then the slowest sample of is - # determined of the entire period since the last reported - # sample. - end - - def harvest_and_send_errors - @unsent_errors = @error_collector.harvest_errors(@unsent_errors) - if @unsent_errors && @unsent_errors.length > 0 - log.debug "Sending #{@unsent_errors.length} errors" - begin - invoke_remote :error_data, @agent_id, @unsent_errors - rescue PostTooBigException - @unsent_errors.shift - retry + + def harvest_and_send_errors + @unsent_errors = @error_collector.harvest_errors(@unsent_errors) + if @unsent_errors && @unsent_errors.length > 0 + log.debug "Sending #{@unsent_errors.length} errors" + begin + invoke_remote :error_data, @agent_id, @unsent_errors + rescue PostTooBigException + @unsent_errors.shift + retry + end + # if the remote invocation fails, then we never clear + # @unsent_errors, and therefore we can re-attempt to send on + # the next heartbeat. Note the error collector maxes out at + # 20 instances to prevent leakage + @unsent_errors = [] + end end - # if the remote invocation fails, then we never clear - # @unsent_errors, and therefore we can re-attempt to send on - # the next heartbeat. Note the error collector maxes out at - # 20 instances to prevent leakage - @unsent_errors = [] - end - end - def compress_data(object) - dump = Marshal.dump(object) - - # this checks to make sure mongrel won't choke on big uploads - check_post_size(dump) - - # we currently optimize for CPU here since we get roughly a 10x - # reduction in message size with this, and CPU overhead is at a - # premium. For extra-large posts, we use the higher compression - # since otherwise it actually errors out. - - dump_size = dump.size - - # small payloads don't need compression - return [dump, 'identity'] if dump_size < (64*1024) - - # medium payloads get fast compression, to save CPU - # big payloads get all the compression possible, to stay under - # the 2,000,000 byte post threshold - compression = dump_size < 2000000 ? Zlib::BEST_SPEED : Zlib::BEST_COMPRESSION - - [Zlib::Deflate.deflate(dump, compression), 'deflate'] - end - - def check_post_size(post_string) - # TODO: define this as a config option on the server side - return if post_string.size < control.post_size_limit - log.warn "Tried to send too much data: #{post_string.size} bytes" - raise PostTooBigException - end + def compress_data(object) + dump = Marshal.dump(object) - def send_request(opts) - request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name) - request.content_type = "application/octet-stream" - request.body = opts[:data] - - log.debug "Connect to #{opts[:collector]}#{opts[:uri]}" - - response = nil - http = control.http_connection(collector) - begin - timeout(@request_timeout) do - response = http.request(request) + # this checks to make sure mongrel won't choke on big uploads + check_post_size(dump) + + # we currently optimize for CPU here since we get roughly a 10x + # reduction in message size with this, and CPU overhead is at a + # premium. For extra-large posts, we use the higher compression + # since otherwise it actually errors out. + + dump_size = dump.size + + # Compress if content is smaller than 64kb. There are problems + # with bugs in Ruby in some versions that expose us to a risk of + # segfaults if we compress aggressively. + return [dump, 'identity'] if dump_size < (64*1024) + + # medium payloads get fast compression, to save CPU + # big payloads get all the compression possible, to stay under + # the 2,000,000 byte post threshold + compression = dump_size < 2000000 ? Zlib::BEST_SPEED : Zlib::BEST_COMPRESSION + + [Zlib::Deflate.deflate(dump, compression), 'deflate'] end - rescue Timeout::Error - log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30 - raise - end - if response.is_a? Net::HTTPServiceUnavailable - raise NewRelic::Agent::ServerConnectionException, "Service unavailable: #{response.body || response.message}" - elsif response.is_a? Net::HTTPGatewayTimeOut - log.debug("Timed out getting response: #{response.message}") - raise Timeout::Error, response.message - elsif !(response.is_a? Net::HTTPSuccess) - raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server: #{response.code}: #{response.message}" - end - response - end - def decompress_response(response) - if response['content-encoding'] != 'gzip' - log.debug "Uncompressed content returned" - return response.body - end - log.debug "Decompressing return value" - i = Zlib::GzipReader.new(StringIO.new(response.body)) - i.read - end + def check_post_size(post_string) + # TODO: define this as a config option on the server side + return if post_string.size < control.post_size_limit + log.warn "Tried to send too much data: #{post_string.size} bytes" + raise PostTooBigException + end - def check_for_exception(response) - dump = decompress_response(response) - value = Marshal.load(dump) - raise value if value.is_a? Exception - value - end - - def remote_method_uri(method) - uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}" - uri << "?run_id=#{@agent_id}" if @agent_id - uri - end - - # send a message via post - def invoke_remote(method, *args) - #determines whether to zip the data or send plain - post_data, encoding = compress_data(args) - - response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data}) + def send_request(opts) + request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name) + request.content_type = "application/octet-stream" + request.body = opts[:data] - # raises the right exception if the remote server tells it to die - return check_for_exception(response) - rescue NewRelic::Agent::ForceRestartException => e - log.info e.message - raise - rescue SystemCallError, SocketError => e - # These include Errno connection errors - raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}" - end - - def graceful_disconnect - if @connected - begin - log.debug "Sending graceful shutdown message to #{control.server}" - - @request_timeout = 10 - log.debug "Flushing unsent metric data to server" - @worker_loop.run_task - if @connected_pid == $$ - log.debug "Sending RPM service agent run shutdown message" - invoke_remote :shutdown, @agent_id, Time.now.to_f + log.debug "Connect to #{opts[:collector]}#{opts[:uri]}" + + response = nil + http = control.http_connection(collector) + http.read_timeout = nil + begin + NewRelic::TimerLib.timeout(@request_timeout) do + response = http.request(request) + end + rescue Timeout::Error + log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30 + raise + end + if response.is_a? Net::HTTPServiceUnavailable + raise NewRelic::Agent::ServerConnectionException, "Service unavailable (#{response.code}): #{response.message}" + elsif response.is_a? Net::HTTPGatewayTimeOut + log.debug("Timed out getting response: #{response.message}") + raise Timeout::Error, response.message + elsif response.is_a? Net::HTTPRequestEntityTooLarge + raise PostTooBigException + elsif !(response.is_a? Net::HTTPSuccess) + raise NewRelic::Agent::ServerConnectionException, "Unexpected response from server (#{response.code}): #{response.message}" + end + response + end + + def decompress_response(response) + if response['content-encoding'] != 'gzip' + log.debug "Uncompressed content returned" + return response.body + end + log.debug "Decompressing return value" + i = Zlib::GzipReader.new(StringIO.new(response.body)) + i.read + end + + def check_for_exception(response) + dump = decompress_response(response) + value = Marshal.load(dump) + raise value if value.is_a? Exception + value + end + + def remote_method_uri(method) + uri = "/agent_listener/#{PROTOCOL_VERSION}/#{control.license_key}/#{method}" + uri << "?run_id=#{@agent_id}" if @agent_id + uri + end + + # send a message via post + def invoke_remote(method, *args) + #determines whether to zip the data or send plain + post_data, encoding = compress_data(args) + + response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data}) + + # raises the right exception if the remote server tells it to die + return check_for_exception(response) + rescue NewRelic::Agent::ForceRestartException => e + log.info e.message + raise + rescue SystemCallError, SocketError => e + # These include Errno connection errors + raise NewRelic::Agent::ServerConnectionException, "Recoverable error connecting to the server: #{e}" + end + + def graceful_disconnect + if @connected + begin + @request_timeout = 10 + log.debug "Flushing unsent metric data to server" + @worker_loop.run_task + if @connected_pid == $$ + log.debug "Sending RPM service agent run shutdown message" + invoke_remote :shutdown, @agent_id, Time.now.to_f + else + log.debug "This agent connected from parent process #{@connected_pid}--not sending shutdown" + end + log.debug "Graceful disconnect complete" + rescue Timeout::Error, StandardError + end else - log.debug "This agent connected from #{@connected_pid}--not sending shutdown" + log.debug "Bypassing graceful disconnect - agent not connected" end - log.debug "Graceful disconnect complete" - rescue Timeout::Error, StandardError end - else - log.debug "Bypassing graceful disconnect - agent not connected" + def default_sql_obfuscator(sql) + sql = sql.dup + # This is hardly readable. Use the unit tests. + # remove single quoted strings: + sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?') + # remove double quoted strings: + sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?') + # replace all number literals + sql.gsub!(/\d+/, "?") + sql + end end + + extend ClassMethods + include InstanceMethods end - def default_sql_obfuscator(sql) - sql = sql.dup - # This is hardly readable. Use the unit tests. - # remove single quoted strings: - sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?') - # remove double quoted strings: - sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?') - # replace all number literals - sql.gsub!(/\d+/, "?") - sql - end end - -end end