lib/plain_apm/agent.rb in plain_apm-0.9.2 vs lib/plain_apm/agent.rb in plain_apm-0.9.3
- old
+ new
@@ -4,21 +4,20 @@
module PlainApm
class Agent
include Singleton
- def self.collect(event)
- instance.collect(event)
+ def enabled?
+ @config && @config.enabled
end
- def self.start
- instance.start
- end
-
def collect(event)
- return unless @config.enabled
+ return unless enabled?
+ # stop accepting events when shutting down / shutdown.
+ return if @status != :running
+
publisher_start if @pid != $$
@events << event
end
@@ -29,11 +28,11 @@
return
end
configure
- return unless @config.enabled
+ return unless enabled?
warn("PlainAPM agent enabled.")
setup_at_exit_hooks
publisher_start
@@ -52,15 +51,17 @@
@pid = $$
# Already running
return if @publisher&.alive?
- # TODO: sized queue w/ a timeout.
- @events = Thread::Queue.new
+ # TODO: sized queue.
+ @events = PlainApm::Queue.new
# TODO: Multiple threads
@publisher = Thread.new { publisher_loop }
+
+ @status = :running
end
def setup_at_exit_hooks
at_exit { stop }
end
@@ -68,10 +69,11 @@
def publisher_shutdown
return if @publisher.nil?
# FIXME: raise in / kill the threads after a pre-determined timeout not
# to block
+ @status = :shutting_down
@events << nil
@publisher.join
@publisher = nil
end
@@ -108,20 +110,40 @@
transport = Transport.new(
endpoint: @config.endpoint,
app_key: @config.app_key
)
+ buf = []
+ timeout = 1.0
+
loop do
- event = @events.pop
+ event = @events.pop(timeout: timeout)
- Thread.exit if event.nil?
+ buf << event if event
- meta = { queue: @events.size, pid: $$, thread: Thread.current.object_id }
-
- # TODO: event serialization, batching
- _response, _error, _retriable = transport.deliver(event, meta)
-
- # TODO: retries / drops
+ case @status
+ when :running
+ # not a timeout or full buffer
+ next if !event.nil? && buf.size < 128
+ send(transport, buf)
+ buf = []
+ when :shutting_down
+ send(transport, buf)
+ buf = []
+ @status = :shutdown
+ break
+ when :shutdown
+ nil
+ else
+ # ?
+ end
end
+ end
+
+ # TODO: retries / drops
+ def send(transport, buf)
+ return if buf.empty?
+ meta = { queue: @events.size, pid: $$, thread: Thread.current.object_id, sent_at: Time.now.to_f }
+ _response, _error, _retriable = transport.deliver(buf, meta)
end
end
end