lib/honeybadger/agent.rb in honeybadger-2.0.0.beta.6 vs lib/honeybadger/agent.rb in honeybadger-2.0.0.beta.7
- old
+ new
@@ -1,21 +1,28 @@
require 'forwardable'
require 'honeybadger/version'
require 'honeybadger/config'
-require 'honeybadger/worker'
require 'honeybadger/notice'
require 'honeybadger/plugin'
require 'honeybadger/logging'
module Honeybadger
- # Internal: A broker for the configuration and the worker.
+ # Internal: A broker for the configuration and the workers.
class Agent
extend Forwardable
include Logging::Helper
+ # Internal: Sub-class thread so we have a named thread (useful for debugging in Thread.list).
+ class Thread < ::Thread; end
+
+ autoload :Worker, 'honeybadger/agent/worker'
+ autoload :NullWorker, 'honeybadger/agent/worker'
+ autoload :Batch, 'honeybadger/agent/batch'
+ autoload :MetricsCollector, 'honeybadger/agent/metrics_collector'
+
class << self
extend Forwardable
def_delegators :callbacks, :exception_filter, :exception_fingerprint, :backtrace_filter
@@ -62,11 +69,12 @@
end
config.logger.info("Starting Honeybadger version #{VERSION}")
load_plugins(config)
@instance = new(config)
- @instance.start
+
+ true
end
def self.stop(*args)
@instance.stop(*args) if @instance
@instance = nil
@@ -86,10 +94,20 @@
def self.increment(*args)
self.instance ? self.instance.increment(*args) : false
end
+ def self.flush(&block)
+ if self.instance
+ self.instance.flush(&block)
+ elsif !block_given?
+ false
+ else
+ yield
+ end
+ end
+
# Internal: Callback to perform after agent has been stopped at_exit.
#
# block - An optional block to execute.
#
# Returns Proc callback.
@@ -110,44 +128,198 @@
else
@config ||= Config.new
end
end
+ attr_reader :delay, :workers, :pid, :thread, :traces, :metrics
+
def initialize(config)
@config = config
- @worker = Worker.new(config)
+ @delay = config.debug? ? 10 : 60
+ @mutex = Mutex.new
+ @pid = Process.pid
+ unless config.backend.kind_of?(Backend::Server)
+ warn('Initializing development backend: data will not be reported.')
+ end
+
+ init_workers
+ init_traces
+ init_metrics
+
at_exit do
stop
self.class.at_exit.call if self.class.at_exit
end
end
- def_delegators :@worker, :stop, :fork, :trace, :timing, :increment
-
+ # Internal: Spawn the agent thread. This method is idempotent.
+ #
+ # Returns false if the Agent is stopped, otherwise true.
def start
- unless worker.backend.kind_of?(Backend::Server)
- warn('Initializing development backend: data will not be reported.')
+ mutex.synchronize do
+ return false unless pid
+ return true if thread && thread.alive?
+
+ debug { 'starting agent' }
+
+ @pid = Process.pid
+ @thread = Thread.new { run }
end
- worker.start
+ true
end
+ def stop(force = false)
+ debug { 'stopping agent' }
+
+ mutex.synchronize do
+ @pid = nil
+ end
+
+ # Kill the collector
+ Thread.kill(thread) if thread
+
+ unless force
+ flush_traces
+ flush_metrics
+ end
+
+ workers.each_pair do |key, worker|
+ worker.send(force ? :shutdown! : :shutdown)
+ end
+
+ true
+ end
+
+ def fork
+ # noop
+ end
+
def notice(opts)
opts.merge!(callbacks: self.class.callbacks)
notice = Notice.new(config, opts)
if notice.ignore?
debug { sprintf('ignore notice feature=notices id=%s', notice.id) }
false
else
debug { sprintf('notice feature=notices id=%s', notice.id) }
- worker.notice(notice)
+ workers[:notices].push(notice)
notice.id
end
end
+ def trace(trace)
+ start
+
+ if trace.duration > config[:'traces.threshold']
+ debug { sprintf('agent adding trace duration=%s feature=traces id=%s', trace.duration.round(2), trace.id) }
+ mutex.synchronize { traces.push(trace) }
+ flush_traces if traces.flush?
+ true
+ else
+ debug { sprintf('agent discarding trace duration=%s feature=traces id=%s', trace.duration.round(2), trace.id) }
+ false
+ end
+ end
+
+ def timing(*args, &block)
+ start
+
+ mutex.synchronize { metrics.timing(*args, &block) }
+ flush_metrics if metrics.flush?
+
+ true
+ end
+
+ def increment(*args, &block)
+ start
+
+ mutex.synchronize { metrics.increment(*args, &block) }
+ flush_metrics if metrics.flush?
+
+ true
+ end
+
+ # Internal: Flush the workers. See Honeybadger#flush.
+ #
+ # block - an option block which is executed before flushing data.
+ #
+ # Returns value from block if block is given, otherwise true.
+ def flush
+ return true unless block_given?
+ yield
+ ensure
+ flush_metrics
+ flush_traces
+ workers.values.each(&:flush)
+ end
+
private
- attr_reader :worker, :config
+ attr_reader :config, :mutex
+
+ def push(feature, object)
+ unless config.features[feature]
+ debug { sprintf('agent dropping feature=%s reason=ping', feature) }
+ return false
+ end
+
+ workers[feature].push(object)
+
+ true
+ end
+
+ def run
+ loop { work }
+ rescue Exception => e
+ error(sprintf('error in agent thread (shutting down) class=%s message=%s at=%s', e.class, e.message.dump, e.backtrace.first.dump))
+ ensure
+ d { sprintf('stopping agent') }
+ end
+
+ def work
+ flush_metrics if metrics.flush?
+ flush_traces if traces.flush?
+ rescue StandardError => e
+ error(sprintf('error in agent thread class=%s message=%s at=%s', e.class, e.message.dump, e.backtrace.first.dump))
+ ensure
+ sleep(delay)
+ end
+
+ def init_workers
+ @workers = Hash.new(NullWorker.new)
+ workers[:notices] = Worker.new(config, :notices)
+ workers[:traces] = Worker.new(config, :traces)
+ workers[:metrics] = Worker.new(config, :metrics)
+ end
+
+ def init_traces
+ @traces = Batch.new(config, :traces, 20, config.debug? ? 10 : 60)
+ end
+
+ def init_metrics
+ @metrics = MetricsCollector.new(config, config.debug? ? 10 : 60)
+ end
+
+ def flush_metrics
+ mutex.synchronize do
+ if (count = metrics.size) > 0
+ debug { sprintf('agent flushing metrics feature=metrics count=%d', count) }
+ end
+ metrics.chunk(100, &method(:push).to_proc.curry[:metrics])
+ init_metrics
+ end
+ end
+
+ def flush_traces
+ mutex.synchronize do
+ if (count = traces.size) > 0
+ debug { sprintf('agent flushing traces feature=traces count=%d', count) }
+ end
+ push(:traces, traces) unless traces.empty?
+ init_traces
+ end
+ end
end
end