# encoding: utf-8 module OneApm module Agent class Agent module ContainerDataManager attr_reader :stats_engine attr_reader :transaction_sampler attr_reader :sql_sampler attr_reader :agent_command_router attr_reader :error_collector attr_reader :custom_event_aggregator def merge_data_for_endpoint(endpoint, data) if data && !data.empty? container_for_endpoint(endpoint).merge!(data) end rescue => e OneApm::Manager.logger.error("Error while merging #{endpoint} data from child: ", e) end def drop_buffered_data @stats_engine.reset! @error_collector.reset! @transaction_sampler.reset! @transaction_event_aggregator.reset! @custom_event_aggregator.reset! @sql_sampler.reset! end private def init_containers(events) @stats_engine = OneApm::Collector::StatsEngine.new @transaction_sampler = OneApm::Collector::TransactionSampler.new @sql_sampler = OneApm::Collector::SqlSampler.new @error_collector = OneApm::Collector::ErrorCollector.new @custom_event_aggregator = OneApm::Collector::CustomEventAggregator.new @transaction_event_aggregator = OneApm::Collector::TransactionEventAggregator.new(@events) @utilization_data = OneApm::Collector::UtilizationData.new @agent_command_router = OneApm::Collector::AgentCommandRouter.new(@events) end def container_for_endpoint(endpoint) case endpoint when :metric_data then @stats_engine when :transaction_sample_data then @transaction_sampler when :sql_trace_data then @sql_sampler when :error_data then @error_collector when :custom_event_data then @custom_event_aggregator when :analytic_event_data then @transaction_event_aggregator end end # Harvests data from the given container, sends it to the named endpoint # on the service, and automatically merges back in upon a recoverable # failure. # # The given container should respond to: # # #harvest! # returns an enumerable collection of data items to be sent to the # collector. # # #reset! # drop any stored data and reset to a clean state. # # #merge!(items) # merge the given items back into the internal buffer of the # container, so that they may be harvested again later. # def harvest_and_send_from_container(container, endpoint) items = harvest_from_container(container, endpoint) send_data_to_endpoint(endpoint, items, container) unless items.empty? end def harvest_from_container(container, endpoint) items = [] begin items = container.harvest! rescue => e OneApm::Manager.logger.error("Failed to harvest #{endpoint} data, resetting. Error: ", e) container.reset! end items end def send_data_to_endpoint(endpoint, items, container) OneApm::Manager.logger.debug("Sending #{items.size} items to #{endpoint}") begin @service.send(endpoint, items) rescue ForceRestartException, ForceDisconnectException raise rescue SerializationError => e OneApm::Manager.logger.warn("Failed to serialize data for #{endpoint}, discarding. Error: ", e) rescue UnrecoverableServerException => e OneApm::Manager.logger.warn("#{endpoint} data was rejected by remote service, discarding. Error: ", e) rescue ServerConnectionException => e log_remote_unavailable(endpoint, e) container.merge!(items) rescue => e OneApm::Manager.logger.info("Unable to send #{endpoint} data, will try again later. Error: ", e) container.merge!(items) end end def harvest_and_send_timeslice_data OneApm::Agent::BusyCalculator.harvest_busy harvest_and_send_from_container(@stats_engine, :metric_data) end def harvest_and_send_slowest_sql harvest_and_send_from_container(@sql_sampler, :sql_trace_data) end def harvest_and_send_transaction_traces harvest_and_send_from_container(@transaction_sampler, :transaction_sample_data) end def harvest_and_send_for_agent_commands harvest_and_send_from_container(@agent_command_router, :profile_data) end def harvest_and_send_errors harvest_and_send_from_container(@error_collector, :error_data) end def harvest_and_send_analytic_event_data harvest_and_send_from_container(@transaction_event_aggregator, :analytic_event_data) harvest_and_send_from_container(@custom_event_aggregator, :custom_event_data) end def harvest_and_send_utilization_data harvest_and_send_from_container(@utilization_data, :utilization_data) end def transmit_data harvest_lock.synchronize do transmit_data_already_locked end end def detect_config @events.notify(:agent_restart) if OneApm::Agent::RestartMonitor.need_restart? end def transmit_event_data transmit_single_data_type(:harvest_and_send_analytic_event_data, "TransactionEvent") end def transmit_utilization_data transmit_single_data_type(:harvest_and_send_utilization_data, "UtilizationData") end def transmit_single_data_type(harvest_method, supportability_name) now = Time.now msg = "Sending #{harvest_method.to_s.gsub("harvest_and_send_", "")} to OneApm Service" OneApm::Manager.logger.debug msg harvest_lock.synchronize do @service.session do # use http keep-alive self.send(harvest_method) end end ensure duration = (Time.now - now).to_f OneApm::Manager.record_metric("Supportability/#{supportability_name}Harvest", duration) end # This method is expected to only be called with the harvest_lock already held def transmit_data_already_locked now = Time.now OneApm::Manager.logger.debug "Sending data to OneApm Service" @events.notify(:before_harvest) @service.session do harvest_and_send_errors harvest_and_send_transaction_traces harvest_and_send_slowest_sql harvest_and_send_timeslice_data check_for_and_handle_agent_commands harvest_and_send_for_agent_commands end ensure OneApm::Agent::Database.close_connections duration = (Time.now - now).to_f OneApm::Manager.record_metric('Supportability/Harvest', duration) end def check_for_and_handle_agent_commands begin @agent_command_router.check_for_and_handle_agent_commands rescue ForceRestartException, ForceDisconnectException raise rescue ServerConnectionException => e log_remote_unavailable(:get_agent_commands, e) rescue => e OneApm::Manager.logger.info("Error during check_for_and_handle_agent_commands, will retry later: ", e) end end def log_remote_unavailable(endpoint, e) OneApm::Manager.logger.debug("Unable to send #{endpoint} data, will try again later. Error: ", e) OneApm::Manager.record_metric("Supportability/remote_unavailable", 0.0) OneApm::Manager.record_metric("Supportability/remote_unavailable/#{endpoint.to_s}", 0.0) end end end end end