lib/plain_apm/agent.rb in plain_apm-0.9.2 vs lib/plain_apm/agent.rb in plain_apm-0.9.3

- old
+ new

@@ -4,21 +4,20 @@ module PlainApm class Agent include Singleton - def self.collect(event) - instance.collect(event) + def enabled? + @config && @config.enabled end - def self.start - instance.start - end - def collect(event) - return unless @config.enabled + return unless enabled? + # stop accepting events when shutting down / shutdown. + return if @status != :running + publisher_start if @pid != $$ @events << event end @@ -29,11 +28,11 @@ return end configure - return unless @config.enabled + return unless enabled? warn("PlainAPM agent enabled.") setup_at_exit_hooks publisher_start @@ -52,15 +51,17 @@ @pid = $$ # Already running return if @publisher&.alive? - # TODO: sized queue w/ a timeout. - @events = Thread::Queue.new + # TODO: sized queue. + @events = PlainApm::Queue.new # TODO: Multiple threads @publisher = Thread.new { publisher_loop } + + @status = :running end def setup_at_exit_hooks at_exit { stop } end @@ -68,10 +69,11 @@ def publisher_shutdown return if @publisher.nil? # FIXME: raise in / kill the threads after a pre-determined timeout not # to block + @status = :shutting_down @events << nil @publisher.join @publisher = nil end @@ -108,20 +110,40 @@ transport = Transport.new( endpoint: @config.endpoint, app_key: @config.app_key ) + buf = [] + timeout = 1.0 + loop do - event = @events.pop + event = @events.pop(timeout: timeout) - Thread.exit if event.nil? + buf << event if event - meta = { queue: @events.size, pid: $$, thread: Thread.current.object_id } - - # TODO: event serialization, batching - _response, _error, _retriable = transport.deliver(event, meta) - - # TODO: retries / drops + case @status + when :running + # not a timeout or full buffer + next if !event.nil? && buf.size < 128 + send(transport, buf) + buf = [] + when :shutting_down + send(transport, buf) + buf = [] + @status = :shutdown + break + when :shutdown + nil + else + # ? + end end + end + + # TODO: retries / drops + def send(transport, buf) + return if buf.empty? + meta = { queue: @events.size, pid: $$, thread: Thread.current.object_id, sent_at: Time.now.to_f } + _response, _error, _retriable = transport.deliver(buf, meta) end end end