module OneApm module Agent class Agent # All of this module used to be contained in the # start_worker_thread method - this is an artifact of # refactoring and can be moved, renamed, etc at will module StartWorkerThread # Never allow any data type to be reported more frequently than once # per second. MIN_ALLOWED_REPORT_PERIOD = 1.0 UTILIZATION_REPORT_PERIOD = 30 * 60 # every half hour LOG_ONCE_KEYS_RESET_PERIOD = 60.0 # Try to launch the worker thread and connect to the server. # # See #connect for a description of connection_options. def start_worker_thread(connection_options = {}) if disable = OneApm::Agent.config[:disable_harvest_thread] OneApm::Agent.logger.info "Not starting Ruby Agent worker thread because :disable_harvest_thread is #{disable}" return end ::OneApm::Agent.logger.info "Creating Ruby Agent worker thread." @worker_thread = OneApm::Agent::Threading::AgentThread.create('Worker Loop') do deferred_work!(connection_options) end end # This is the method that is run in a new thread in order to # background the harvesting and sending of data during the # normal operation of the agent. # # Takes connection options that determine how we should # connect to the server, and loops endlessly - typically we # never return from this method unless we're shutting down # the agent def deferred_work!(connection_options) catch_errors do OneApm::Agent.disable_all_tracing do connect(connection_options) if connected? create_and_run_event_loop # never reaches here unless there is a problem or # the agent is exiting else ::OneApm::Agent.logger.debug "No connection. Worker thread ending." end end end end def create_and_run_event_loop @event_loop = create_event_loop @event_loop.on(:report_data) { transmit_data } @event_loop.on(:report_event_data) { transmit_event_data } @event_loop.on(:reset_log_once_keys) { OneApm::Agent.logger.clear_already_logged } @event_loop.fire_every(Agent.config[:data_report_period], :report_data) @event_loop.fire_every(report_period_for(:analytic_event_data), :report_event_data) @event_loop.fire_every(LOG_ONCE_KEYS_RESET_PERIOD, :reset_log_once_keys) if Agent.config[:collect_utilization] && !in_resque_child_process? @event_loop.on(:report_utilization_data) { transmit_utilization_data } @event_loop.fire(:report_utilization_data) @event_loop.fire_every(UTILIZATION_REPORT_PERIOD, :report_utilization_data) end @event_loop.run end def create_event_loop EventLoop.new end def report_period_for(method) config_key = "data_report_periods.#{method}".to_sym period = Agent.config[config_key] if !period period = Agent.config[:data_report_period] ::OneApm::Agent.logger.warn("Could not find configured period for #{method}, falling back to data_report_period (#{period} s)") end if period < MIN_ALLOWED_REPORT_PERIOD ::OneApm::Agent.logger.warn("Configured #{config_key} was #{period}, but minimum allowed is #{MIN_ALLOWED_REPORT_PERIOD}, using #{MIN_ALLOWED_REPORT_PERIOD}.") period = MIN_ALLOWED_REPORT_PERIOD end period end def stop_event_loop if @event_loop @event_loop.run_once(true) if Agent.config[:force_send] @event_loop.stop end end private # a wrapper method to handle all the errors that can happen # in the connection and worker thread system. This # guarantees a no-throw from the background thread. def catch_errors yield rescue OneApm::ForceRestartException => e handle_force_restart(e) retry rescue OneApm::ForceDisconnectException => e handle_force_disconnect(e) rescue => e handle_other_error(e) end # Handles the case where the server tells us to restart - # this clears the data, clears connection attempts, and # waits a while to reconnect. def handle_force_restart(error) ::OneApm::Agent.logger.debug error.message drop_buffered_data @service.reset_metric_id_cache if @service @connect_state = :pending sleep 30 end # when a disconnect is requested, stop the current thread, which # is the worker thread that gathers data and talks to the # server. def handle_force_disconnect(error) ::OneApm::Agent.logger.warn "OneApm forced this agent to disconnect (#{error.message})" disconnect end # Handles an unknown error in the worker thread by logging # it and disconnecting the agent, since we are now in an # unknown state. def handle_other_error(error) ::OneApm::Agent.logger.error "Unhandled error in worker thread, disconnecting this agent process:" # These errors are fatal (that is, they will prevent the agent from # reporting entirely), so we really want backtraces when they happen ::OneApm::Agent.logger.log_exception(:error, error) disconnect end end end end end