lib/honeybadger/agent.rb in honeybadger-2.0.0.beta.6 vs lib/honeybadger/agent.rb in honeybadger-2.0.0.beta.7

- old
+ new

@@ -1,21 +1,28 @@ require 'forwardable' require 'honeybadger/version' require 'honeybadger/config' -require 'honeybadger/worker' require 'honeybadger/notice' require 'honeybadger/plugin' require 'honeybadger/logging' module Honeybadger - # Internal: A broker for the configuration and the worker. + # Internal: A broker for the configuration and the workers. class Agent extend Forwardable include Logging::Helper + # Internal: Sub-class thread so we have a named thread (useful for debugging in Thread.list). + class Thread < ::Thread; end + + autoload :Worker, 'honeybadger/agent/worker' + autoload :NullWorker, 'honeybadger/agent/worker' + autoload :Batch, 'honeybadger/agent/batch' + autoload :MetricsCollector, 'honeybadger/agent/metrics_collector' + class << self extend Forwardable def_delegators :callbacks, :exception_filter, :exception_fingerprint, :backtrace_filter @@ -62,11 +69,12 @@ end config.logger.info("Starting Honeybadger version #{VERSION}") load_plugins(config) @instance = new(config) - @instance.start + + true end def self.stop(*args) @instance.stop(*args) if @instance @instance = nil @@ -86,10 +94,20 @@ def self.increment(*args) self.instance ? self.instance.increment(*args) : false end + def self.flush(&block) + if self.instance + self.instance.flush(&block) + elsif !block_given? + false + else + yield + end + end + # Internal: Callback to perform after agent has been stopped at_exit. # # block - An optional block to execute. # # Returns Proc callback. @@ -110,44 +128,198 @@ else @config ||= Config.new end end + attr_reader :delay, :workers, :pid, :thread, :traces, :metrics + def initialize(config) @config = config - @worker = Worker.new(config) + @delay = config.debug? ? 10 : 60 + @mutex = Mutex.new + @pid = Process.pid + unless config.backend.kind_of?(Backend::Server) + warn('Initializing development backend: data will not be reported.') + end + + init_workers + init_traces + init_metrics + at_exit do stop self.class.at_exit.call if self.class.at_exit end end - def_delegators :@worker, :stop, :fork, :trace, :timing, :increment - + # Internal: Spawn the agent thread. This method is idempotent. + # + # Returns false if the Agent is stopped, otherwise true. def start - unless worker.backend.kind_of?(Backend::Server) - warn('Initializing development backend: data will not be reported.') + mutex.synchronize do + return false unless pid + return true if thread && thread.alive? + + debug { 'starting agent' } + + @pid = Process.pid + @thread = Thread.new { run } end - worker.start + true end + def stop(force = false) + debug { 'stopping agent' } + + mutex.synchronize do + @pid = nil + end + + # Kill the collector + Thread.kill(thread) if thread + + unless force + flush_traces + flush_metrics + end + + workers.each_pair do |key, worker| + worker.send(force ? :shutdown! : :shutdown) + end + + true + end + + def fork + # noop + end + def notice(opts) opts.merge!(callbacks: self.class.callbacks) notice = Notice.new(config, opts) if notice.ignore? debug { sprintf('ignore notice feature=notices id=%s', notice.id) } false else debug { sprintf('notice feature=notices id=%s', notice.id) } - worker.notice(notice) + workers[:notices].push(notice) notice.id end end + def trace(trace) + start + + if trace.duration > config[:'traces.threshold'] + debug { sprintf('agent adding trace duration=%s feature=traces id=%s', trace.duration.round(2), trace.id) } + mutex.synchronize { traces.push(trace) } + flush_traces if traces.flush? + true + else + debug { sprintf('agent discarding trace duration=%s feature=traces id=%s', trace.duration.round(2), trace.id) } + false + end + end + + def timing(*args, &block) + start + + mutex.synchronize { metrics.timing(*args, &block) } + flush_metrics if metrics.flush? + + true + end + + def increment(*args, &block) + start + + mutex.synchronize { metrics.increment(*args, &block) } + flush_metrics if metrics.flush? + + true + end + + # Internal: Flush the workers. See Honeybadger#flush. + # + # block - an option block which is executed before flushing data. + # + # Returns value from block if block is given, otherwise true. + def flush + return true unless block_given? + yield + ensure + flush_metrics + flush_traces + workers.values.each(&:flush) + end + private - attr_reader :worker, :config + attr_reader :config, :mutex + + def push(feature, object) + unless config.features[feature] + debug { sprintf('agent dropping feature=%s reason=ping', feature) } + return false + end + + workers[feature].push(object) + + true + end + + def run + loop { work } + rescue Exception => e + error(sprintf('error in agent thread (shutting down) class=%s message=%s at=%s', e.class, e.message.dump, e.backtrace.first.dump)) + ensure + d { sprintf('stopping agent') } + end + + def work + flush_metrics if metrics.flush? + flush_traces if traces.flush? + rescue StandardError => e + error(sprintf('error in agent thread class=%s message=%s at=%s', e.class, e.message.dump, e.backtrace.first.dump)) + ensure + sleep(delay) + end + + def init_workers + @workers = Hash.new(NullWorker.new) + workers[:notices] = Worker.new(config, :notices) + workers[:traces] = Worker.new(config, :traces) + workers[:metrics] = Worker.new(config, :metrics) + end + + def init_traces + @traces = Batch.new(config, :traces, 20, config.debug? ? 10 : 60) + end + + def init_metrics + @metrics = MetricsCollector.new(config, config.debug? ? 10 : 60) + end + + def flush_metrics + mutex.synchronize do + if (count = metrics.size) > 0 + debug { sprintf('agent flushing metrics feature=metrics count=%d', count) } + end + metrics.chunk(100, &method(:push).to_proc.curry[:metrics]) + init_metrics + end + end + + def flush_traces + mutex.synchronize do + if (count = traces.size) > 0 + debug { sprintf('agent flushing traces feature=traces count=%d', count) } + end + push(:traces, traces) unless traces.empty? + init_traces + end + end end end