lib/new_relic/agent/agent.rb in newrelic_rpm-2.9.9 vs lib/new_relic/agent/agent.rb in newrelic_rpm-2.10.3

- old
+ new

@@ -6,27 +6,31 @@ require 'stringio' # The NewRelic Agent collects performance data from ruby applications # in realtime as the application runs, and periodically sends that # data to the NewRelic server. -module NewRelic::Agent +module NewRelic + module Agent # The Agent is a singleton that is instantiated when the plugin is # activated. class Agent # Specifies the version of the agent's communication protocol with # the NewRelic hosted site. - PROTOCOL_VERSION = 5 + PROTOCOL_VERSION = 8 attr_reader :obfuscator attr_reader :stats_engine attr_reader :transaction_sampler attr_reader :error_collector - attr_reader :worker_loop + attr_reader :task_loop attr_reader :record_sql + attr_reader :histogram + attr_reader :metric_ids + attr_reader :should_send_errors # Should only be called by NewRelic::Control def self.instance @instance ||= self.new end @@ -39,32 +43,36 @@ # for passenger where processes are forked and the agent is # dormant # def ensure_worker_thread_started return unless control.agent_enabled? && control.monitor_mode? && !@invalid_license - if !running? + if !running? + # We got some reports of threading errors in Unicorn with this. + log.debug "Detected that the worker loop is not running. Restarting." rescue nil + # Assume we've been forked, clear out stats that are left over from parent process + reset_stats launch_worker_thread @stats_engine.spawn_sampler_thread end end # True if the worker thread has been started. Doesn't necessarily # mean we are connected def running? - control.agent_enabled? && control.monitor_mode? && @worker_loop && @worker_loop.pid == $$ + control.agent_enabled? && control.monitor_mode? && @task_loop && @task_loop.pid == $$ end # True if we have initialized and completed 'start' def started? @started end # Attempt a graceful shutdown of the agent. def shutdown return if not started? - if @worker_loop - @worker_loop.stop + if @task_loop + @task_loop.stop log.debug "Starting Agent shutdown" # if litespeed, then ignore all future SIGUSR1 - it's # litespeed trying to shut us down @@ -83,41 +91,38 @@ end @started = nil end def start_transaction - Thread::current[:custom_params] = nil @stats_engine.start_transaction end def end_transaction - Thread::current[:custom_params] = nil @stats_engine.end_transaction end def set_record_sql(should_record) prev = Thread::current[:record_sql] Thread::current[:record_sql] = should_record - - prev || true + prev.nil? || prev end def set_record_tt(should_record) prev = Thread::current[:record_tt] Thread::current[:record_tt] = should_record - - prev || true + prev.nil? || prev end - - def add_custom_parameters(params) - p = Thread::current[:custom_params] || (Thread::current[:custom_params] = {}) - - p.merge!(params) + # Push flag indicating whether we should be tracing in this + # thread. + def push_trace_execution_flag(should_trace=false) + (Thread.current[:newrelic_untraced] ||= []) << should_trace end - - def custom_params - Thread::current[:custom_params] || {} + + # Pop the current trace execution status. Restore trace execution status + # to what it was before we pushed the current flag. + def pop_trace_execution_flag + Thread.current[:newrelic_untraced].pop if Thread.current[:newrelic_untraced] end def set_sql_obfuscator(type, &block) if type == :before @obfuscator = NewRelic::ChainedCall.new(block, @obfuscator) @@ -145,15 +150,16 @@ end return if !control.agent_enabled? @local_host = determine_host - log.info "Web container: #{control.dispatcher.to_s}" - - if control.dispatcher == :passenger - log.warn "Phusion Passenger has been detected. Some RPM memory statistics may have inaccuracies due to short process lifespans." + if control.dispatcher.nil? || control.dispatcher.to_s.empty? + log.info "No dispatcher detected." + else + log.info "Dispatcher: #{control.dispatcher.to_s}" end + log.info "Application: #{control.app_names.join(", ")}" unless control.app_names.empty? @started = true sampler_config = control.fetch('transaction_tracer', {}) @use_transaction_sampler = sampler_config.fetch('enabled', true) @@ -163,11 +169,11 @@ # use transaction_threshold: 4.0 to force the TT collection # threshold to 4 seconds # use transaction_threshold: apdex_f to use your apdex t value # multiplied by 4 # undefined transaction_threshold defaults to 2.0 - apdex_f = 4 * NewRelic::Control.instance['apdex_t'].to_f + apdex_f = 4 * NewRelic::Control.instance.apdex_t @slowest_transaction_threshold = sampler_config.fetch('transaction_threshold', 2.0) if @slowest_transaction_threshold =~ /apdex_f/i @slowest_transaction_threshold = apdex_f end @slowest_transaction_threshold = @slowest_transaction_threshold.to_f @@ -183,12 +189,10 @@ log.warn "Agent is configured to send raw SQL to RPM service" if @record_sql == :raw # Initialize transaction sampler @transaction_sampler.random_sampling = @random_sample if control.monitor_mode? - # make sure the license key exists and is likely to be really a license key - # by checking it's string length (license keys are 40 character strings.) if !control.license_key @invalid_license = true control.log! "No license key found. Please edit your newrelic.yml file and insert your license key.", :error elsif control.license_key.length != 40 @invalid_license = true @@ -196,11 +200,11 @@ else launch_worker_thread # When the VM shuts down, attempt to send a message to the # server that this agent run is stopping, assuming it has # successfully connected - # This shutdown handler doesn't work if Sinatra or Unicorn is running + # This shutdown handler doesn't work if Sinatra is running # because it executes in the shutdown handler! at_exit { shutdown } unless [:sinatra, :unicorn].include? NewRelic::Control.instance.dispatcher end end control.log! "New Relic RPM Agent #{NewRelic::VERSION::STRING} Initialized: pid = #{$$}" @@ -212,88 +216,74 @@ @collector ||= control.server end # Connect to the server, and run the worker loop forever. # Will not return. - def run_worker_loop - - # connect to the server. this will keep retrying until - # successful or it determines the license is bad. - connect + def run_task_loop + # determine the reporting period (server based) + # note if the agent attempts to report more frequently than + # the specified report data, then it will be ignored. - # We may not be connected now but keep going for dev mode - if @connected - begin - # determine the reporting period (server based) - # note if the agent attempts to report more frequently than - # the specified report data, then it will be ignored. - - control.log! "Reporting performance data every #{@report_period} seconds." - @worker_loop.add_task(@report_period) do - harvest_and_send_timeslice_data - end - - if @should_send_samples && @use_transaction_sampler - @worker_loop.add_task(@report_period) do - harvest_and_send_slowest_sample - end - elsif !control.developer_mode? - # We still need the sampler for dev mode. - @transaction_sampler.disable - end - - if @should_send_errors && @error_collector.enabled - @worker_loop.add_task(@report_period) do - harvest_and_send_errors - end - end - @worker_loop.run - rescue StandardError - @connected = false - raise + control.log! "Reporting performance data every #{@report_period} seconds." + @task_loop.add_task(@report_period, "Timeslice Data Send") do + harvest_and_send_timeslice_data + end + + if @should_send_samples && @use_transaction_sampler + @task_loop.add_task(@report_period, "Transaction Sampler Send") do + harvest_and_send_slowest_sample end + elsif !control.developer_mode? + # We still need the sampler for dev mode. + @transaction_sampler.disable end + + if @should_send_errors && @error_collector.enabled + @task_loop.add_task(@report_period, "Error Send") do + harvest_and_send_errors + end + end + log.debug "Running worker loop" + @task_loop.run + rescue StandardError + log.debug "Error in worker loop: #{$!}" + @connected = false + raise end def launch_worker_thread if (control.dispatcher == :passenger && $0 =~ /ApplicationSpawner/) log.debug "Process is passenger spawner - don't connect to RPM service" return end - @worker_loop = WorkerLoop.new(log) + @task_loop = WorkerLoop.new(log) - if control['check_bg_loading'] - log.warn "Agent background loading checking turned on" - require 'new_relic/agent/patch_const_missing' - ClassLoadingWatcher.enable_warning - end - + log.debug "Creating RPM worker thread." @worker_thread = Thread.new do begin - ClassLoadingWatcher.background_thread=Thread.current if control['check_bg_loading'] - - run_worker_loop + NewRelic::Agent.disable_all_tracing do + connect + run_task_loop if @connected + end + rescue NewRelic::Agent::ForceRestartException => e + log.info e.message + # disconnect and start over. + # clear the stats engine + reset_stats + @connected = false + # Wait a short time before trying to reconnect + sleep 30 + retry rescue IgnoreSilentlyException control.log! "Unable to establish connection with the server. Run with log level set to debug for more information." - rescue StandardError => e - control.log! e, :error - control.log! e.backtrace.join("\n "), :error + rescue Exception => e + @connected = false + log.error "Terminating worker loop: #{e.class.name}: #{e}\n #{e.backtrace.join("\n ")}" end end @worker_thread['newrelic_label'] = 'Worker Loop' - - # This code should be activated to check that no dependency - # loading is occuring in the background thread by stopping the - # foreground thread after the background thread is created. Turn - # on dependency loading logging and make sure that no loading - # occurs. - # - # control.log! "FINISHED AGENT INIT" - # while true - # sleep 1 - # end end def control NewRelic::Control.instance end @@ -301,20 +291,22 @@ def initialize @connected = false @launch_time = Time.now @metric_ids = {} - + @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10) @stats_engine = NewRelic::Agent::StatsEngine.new - @transaction_sampler = NewRelic::Agent::TransactionSampler.new(self) - @error_collector = NewRelic::Agent::ErrorCollector.new(self) + @transaction_sampler = NewRelic::Agent::TransactionSampler.new + @stats_engine.transaction_sampler = @transaction_sampler + @error_collector = NewRelic::Agent::ErrorCollector.new @request_timeout = NewRelic::Control.instance.fetch('timeout', 2 * 60) @invalid_license = false @last_harvest_time = Time.now + @obfuscator = method(:default_sql_obfuscator) end # Connect to the server and validate the license. If successful, # @connected has true when finished. If not successful, you can # keep calling this. Return false if we could not establish a @@ -325,20 +317,21 @@ connect_retry_period = 5 connect_attempts = 0 @agent_id = nil begin sleep connect_retry_period.to_i + environment = control['send_environment_info'] != false ? control.local_env.snapshot : [] @agent_id ||= invoke_remote :start, @local_host, { :pid => $$, :launch_time => @launch_time.to_f, :agent_version => NewRelic::VERSION::STRING, - :environment => control.local_env.snapshot, + :environment => environment, :settings => control.settings, :validate_seed => ENV['NR_VALIDATE_SEED'], :validate_token => ENV['NR_VALIDATE_TOKEN'] } - host = invoke_remote(:get_redirect_host) rescue nil + host = invoke_remote(:get_redirect_host) @collector = control.server_from_host(host) if host @report_period = invoke_remote :get_data_report_period, @agent_id @@ -350,12 +343,11 @@ @should_send_samples = invoke_remote :should_collect_samples, @agent_id if @should_send_samples sampling_rate = invoke_remote :sampling_rate, @agent_id if @random_sample @transaction_sampler.sampling_rate = sampling_rate - - log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}" + log.info "Transaction sample rate: #{@transaction_sampler.sampling_rate}" if sampling_rate end # Ask for permission to collect error data @should_send_errors = invoke_remote :should_collect_errors, @agent_id @@ -396,40 +388,45 @@ end def determine_home_directory control.root end + def reset_stats + @stats_engine.reset_stats + @metric_ids = {} + @unsent_errors = [] + @traces = nil + @unsent_timeslice_data = {} + @last_harvest_time = Time.now + @launch_time = Time.now + @histogram = NewRelic::Histogram.new(NewRelic::Control.instance.apdex_t / 10) + end def harvest_and_send_timeslice_data - NewRelic::Agent::Instrumentation::DispatcherInstrumentation::BusyCalculator.harvest_busy + NewRelic::Agent::BusyCalculator.harvest_busy now = Time.now - - # Fixme: remove the harvest thread tracking - @harvest_thread ||= Thread.current - - if @harvest_thread != Thread.current - log.error "Two harvest threads are running (current=#{Thread.current}, harvest=#{@harvest_thread}" - @harvest_thread = Thread.current - end - + @unsent_timeslice_data ||= {} @unsent_timeslice_data = @stats_engine.harvest_timeslice_data(@unsent_timeslice_data, @metric_ids) begin + # In this version of the protocol, we get back an assoc array of spec to id. metric_ids = invoke_remote(:metric_data, @agent_id, @last_harvest_time.to_f, now.to_f, @unsent_timeslice_data.values) rescue Timeout::Error # assume that the data was received. chances are that it was metric_ids = nil end - @metric_ids.merge! metric_ids if metric_ids + metric_ids.each do | spec, id | + @metric_ids[spec] = id + end if metric_ids log.debug "#{now}: sent #{@unsent_timeslice_data.length} timeslices (#{@agent_id}) in #{Time.now - now} seconds" # if we successfully invoked this web service, then clear the unsent message cache. @unsent_timeslice_data = {} @@ -453,21 +450,18 @@ # out stack traces, and normalizing SQL. note that we # explain only the sql statements whose segments' execution # times exceed our threshold (to avoid unnecessary overhead # of running explains on fast queries.) traces = @traces.collect {|trace| trace.prepare_to_send(:explain_sql => @explain_threshold, :record_sql => @record_sql, :keep_backtraces => true, :explain_enabled => @explain_enabled)} - - invoke_remote :transaction_sample_data, @agent_id, traces rescue PostTooBigException # we tried to send too much data, drop the first trace and # try again - @traces.shift - retry + retry if @traces.shift end - log.debug "#{now}: sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds" + log.debug "Sent slowest sample (#{@agent_id}) in #{Time.now - now} seconds" end # if we succeed sending this sample, then we don't need to keep # the slowest sample around - it has been sent already and we # can collect the next one @@ -498,10 +492,13 @@ end def compress_data(object) dump = Marshal.dump(object) + # this checks to make sure mongrel won't choke on big uploads + check_post_size(dump) + # we currently optimize for CPU here since we get roughly a 10x # reduction in message size with this, and CPU overhead is at a # premium. For extra-large posts, we use the higher compression # since otherwise it actually errors out. @@ -518,34 +515,39 @@ [Zlib::Deflate.deflate(dump, compression), 'deflate'] end def check_post_size(post_string) # TODO: define this as a config option on the server side - return if post_string.size < 2000000 - log.warn "Tried to send too much data, retrying with less: #{post_string.size} bytes" + return if post_string.size < control.post_size_limit + log.warn "Tried to send too much data: #{post_string.size} bytes" raise PostTooBigException end def send_request(opts) request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'ACCEPT-ENCODING' => 'gzip', 'HOST' => opts[:collector].name) request.content_type = "application/octet-stream" request.body = opts[:data] - log.debug "connect to #{opts[:collector]}#{opts[:uri]}" + log.debug "Connect to #{opts[:collector]}#{opts[:uri]}" response = nil http = control.http_connection(collector) begin timeout(@request_timeout) do response = http.request(request) end rescue Timeout::Error log.warn "Timed out trying to post data to RPM (timeout = #{@request_timeout} seconds)" unless @request_timeout < 30 - raise IgnoreSilentlyException + raise end - - if !(response.is_a? Net::HTTPSuccess) + if response.is_a? Net::HTTPServiceUnavailable + log.debug(response.body || response.message) + raise IgnoreSilentlyException + elsif response.is_a? Net::HTTPGatewayTimeOut + log.debug("Timed out getting response: #{response.message}") + raise Timeout::Error, response.message + elsif !(response.is_a? Net::HTTPSuccess) log.debug "Unexpected response from server: #{response.code}: #{response.message}" raise IgnoreSilentlyException end response end @@ -576,17 +578,17 @@ # send a message via post def invoke_remote(method, *args) #determines whether to zip the data or send plain post_data, encoding = compress_data(args) - # this checks to make sure mongrel won't choke on big uploads - check_post_size(post_data) - response = send_request({:uri => remote_method_uri(method), :encoding => encoding, :collector => collector, :data => post_data}) # raises the right exception if the remote server tells it to die return check_for_exception(response) + rescue ForceRestartException => e + log.info e.message + raise rescue ForceDisconnectException => e log.error "RPM forced this agent to disconnect (#{e.message})\n" \ "Restart this process to resume monitoring via rpm.newrelic.com." # when a disconnect is requested, stop the current thread, which # is the worker thread that gathers data and talks to the @@ -598,25 +600,40 @@ log.debug "Recoverable error connecting to the server: #{e}" raise IgnoreSilentlyException end def graceful_disconnect - if @connected && !(control.server.name == "localhost" && control.dispatcher_instance_id == '3000') + if @connected begin log.debug "Sending graceful shutdown message to #{control.server}" - @request_timeout = 10 + @request_timeout = 5 log.debug "Sending RPM service agent run shutdown message" + harvest_and_send_timeslice_data +# harvest_and_send_slowest_sample + harvest_and_send_errors invoke_remote :shutdown, @agent_id, Time.now.to_f log.debug "Graceful shutdown complete" rescue Timeout::Error, StandardError end else log.debug "Bypassing graceful shutdown - agent not connected" end end + def default_sql_obfuscator(sql) + sql = sql.dup + # This is hardly readable. Use the unit tests. + # remove single quoted strings: + sql.gsub!(/'(.*?[^\\'])??'(?!')/, '?') + # remove double quoted strings: + sql.gsub!(/"(.*?[^\\"])??"(?!")/, '?') + # replace all number literals + sql.gsub!(/\d+/, "?") + sql + end end +end end