Sha256: 52b57110c3ead7c30da30ee69e805c795295b8a5a1812f6dcb0e95c5a2723577
Contents?: true
Size: 1.79 KB
Versions: 8
Compression:
Stored size: 1.79 KB
Contents
# frozen_string_literal: true require_relative 'periodic_sync' module Prefab class LogPathAggregator LOG = Prefab::InternalLogger.new(self) include Prefab::PeriodicSync INCREMENT = ->(count) { (count || 0) + 1 } SEVERITY_KEY = { ::Logger::DEBUG => 'debugs', ::Logger::INFO => 'infos', ::Logger::WARN => 'warns', ::Logger::ERROR => 'errors', ::Logger::FATAL => 'fatals' }.freeze attr_reader :data def initialize(client:, max_paths:, sync_interval:) @max_paths = max_paths @client = client @name = 'log_path_aggregator' @data = Concurrent::Map.new @last_data_sent = nil @last_request = nil start_periodic_sync(sync_interval) end def push(path, severity) return if @data.size >= @max_paths @data.compute([path, severity], &INCREMENT) end private def flush(to_ship, start_at_was) pool.post do LOG.debug "Uploading stats for #{to_ship.size} paths" aggregate = Hash.new { |h, k| h[k] = PrefabProto::Logger.new } to_ship.each do |(path, severity), count| aggregate[path][SEVERITY_KEY[severity]] = count aggregate[path]['logger_name'] = path end loggers = PrefabProto::LoggersTelemetryEvent.new(loggers: aggregate.values, start_at: start_at_was, end_at: Prefab::TimeHelpers.now_in_ms) events = PrefabProto::TelemetryEvents.new( instance_hash: instance_hash, events: [PrefabProto::TelemetryEvent.new(loggers: loggers)] ) result = post('/api/v1/telemetry', events) LOG.debug "Uploaded #{to_ship.size} paths: #{result.status}" end end end end
Version data entries
8 entries across 8 versions & 1 rubygems