# encoding: utf-8 require 'one_apm/support/event_buffer/sized_buffer' module OneApm module Collector class CustomEventAggregator include OneApm::Coerce OA_TYPE = 'type'.freeze OA_TIMESTAMP = 'timestamp'.freeze OA_EVENT_PARAMS_CTX = 'recording custom event'.freeze OA_EVENT_TYPE_REGEX = /^[a-zA-Z0-9:_ ]+$/.freeze OA_DEFAULT_CAPACITY_KEY = :'custom_insights_events.max_samples_stored' def initialize @lock = Mutex.new @buffer = OneApm::Agent::SampledBuffer.new(OneApm::Manager.config[OA_DEFAULT_CAPACITY_KEY]) @type_strings = Hash.new { |hash, key| hash[key] = key.to_s.freeze } register_config_callbacks end def register_config_callbacks OneApm::Manager.config.register_callback(OA_DEFAULT_CAPACITY_KEY) do |max_samples| OneApm::Manager.logger.debug "CustomEventAggregator max_samples set to #{max_samples}" @lock.synchronize do @buffer.capacity = max_samples end end end def record(type, attributes) type = @type_strings[type] unless type =~ OA_EVENT_TYPE_REGEX note_dropped_event(type) return false end event = [ { OA_TYPE => type, OA_TIMESTAMP => Time.now.to_i }, event_params(attributes, OA_EVENT_PARAMS_CTX) ] stored = @lock.synchronize do @buffer.append(event) end stored end def harvest! results = [] drop_count = 0 @lock.synchronize do results.concat(@buffer.to_a) drop_count += @buffer.num_dropped @buffer.reset! end note_dropped_events(results.size, drop_count) results end def note_dropped_events(captured_count, dropped_count) total_count = captured_count + dropped_count if dropped_count > 0 OneApm::Manager.logger.warn("Dropped #{dropped_count} custom events out of #{total_count}.") end engine = OneApm::Manager.agent.stats_engine engine.tl_record_supportability_metric_count("Events/Customer/Seen" , total_count) engine.tl_record_supportability_metric_count("Events/Customer/Sent" , captured_count) engine.tl_record_supportability_metric_count("Events/Customer/Dropped", dropped_count) end def merge!(events) @lock.synchronize do events.each do |event| @buffer.append(event) end end end def reset! @lock.synchronize { @buffer.reset! } end def note_dropped_event(type) OneApm::Manager.logger.log_once(:warn, "dropping_event_of_type:#{type}", "Invalid event type name '#{type}', not recording.") @buffer.note_dropped end end end end