# frozen_string_literal: true require "singleton" module PlainApm class Agent include Singleton def enabled? @config && @config.enabled end def collect(event) return unless enabled? # stop accepting events when shutting down / shutdown. return if @status != :running publisher_start if @pid != $$ @events << event end def start if !defined?(@started) @started = true else return end configure return unless enabled? warn("PlainAPM agent enabled.") setup_at_exit_hooks publisher_start install_hooks end private def stop uninstall_hooks publisher_shutdown end def publisher_start # Store PID for fork detection @pid = $$ # Already running return if @publisher&.alive? # TODO: sized queue. @events = PlainApm::Queue.new # TODO: Multiple threads @publisher = Thread.new { publisher_loop } @status = :running end def setup_at_exit_hooks at_exit { stop } end def publisher_shutdown return if @publisher.nil? # FIXME: raise in / kill the threads after a pre-determined timeout not # to block @status = :shutting_down @events << nil @publisher.join @publisher = nil end def configure @config = Config.new end def install_hooks hooks.each(&:install) end def uninstall_hooks hooks.each(&:uninstall) end def hooks @hooks ||= [ Hooks::ActionMailer, Hooks::ActionPack, Hooks::ActionView, Hooks::ActiveJob, Hooks::ActiveRecord, Hooks::ActiveSupport, Hooks::ErrorReporter, Hooks::Manual ].map(&:new) end ## # Run a background thread that pops events from the queue and posts them to # the target server. def publisher_loop # Have the thread keep it's own connection. transport = Transport.new( endpoint: @config.endpoint, app_key: @config.app_key ) buf = [] timeout = 1.0 loop do event = @events.pop(timeout: timeout) buf << event if event case @status when :running # not a timeout or full buffer next if !event.nil? && buf.size < 128 send(transport, buf) buf = [] when :shutting_down send(transport, buf) buf = [] @status = :shutdown break when :shutdown nil else # ? end end end # TODO: retries / drops def send(transport, buf) return if buf.empty? meta = { queue: @events.size, pid: $$, thread: Thread.current.object_id, sent_at: Time.now.to_f } _response, _error, _retriable = transport.deliver(buf, meta) end end end