lib/elastic_apm/agent.rb in elastic-apm-1.1.0 vs lib/elastic_apm/agent.rb in elastic-apm-2.0.0

- old
+ new

@@ -3,45 +3,32 @@ require 'elastic_apm/naively_hashable' require 'elastic_apm/context_builder' require 'elastic_apm/error_builder' require 'elastic_apm/stacktrace_builder' require 'elastic_apm/error' -require 'elastic_apm/http' +require 'elastic_apm/transport/base' require 'elastic_apm/spies' -require 'elastic_apm/serializers' -require 'elastic_apm/worker' module ElasticAPM # rubocop:disable Metrics/ClassLength # @api private class Agent - include Log + include Logging LOCK = Mutex.new # life cycle def self.instance # rubocop:disable Style/TrivialAccessors @instance end - def self.start(config) # rubocop:disable Metrics/MethodLength + def self.start(config) return @instance if @instance config = Config.new(config) unless config.is_a?(Config) - unless config.enabled_environments.include?(config.environment) - unless config.disable_environment_warning? - puts format( - '%sNot tracking anything in "%s" env', - Log::PREFIX, config.environment - ) - end - - return - end - LOCK.synchronize do return @instance if @instance @instance = new(config).start end @@ -60,100 +47,107 @@ !!@instance end def initialize(config) @config = config - @http = Http.new(config) - @messages = Queue.new - @pending_transactions = Queue.new + @transport = Transport::Base.new(config) + @instrumenter = Instrumenter.new(config) { |event| enqueue event } - @instrumenter = Instrumenter.new(self) - + @stacktrace_builder = StacktraceBuilder.new(config) @context_builder = ContextBuilder.new(self) @error_builder = ErrorBuilder.new(self) - @stacktrace_builder = StacktraceBuilder.new(self) end - attr_reader :config, :messages, :pending_transactions, :instrumenter, - :context_builder, :stacktrace_builder, :http + attr_reader :config, :transport, :instrumenter, + :stacktrace_builder, :context_builder, :error_builder def start - debug '[%s] Starting agent, reporting to %s', VERSION, config.server_url + info '[%s] Starting agent, reporting to %s', VERSION, config.server_url - @instrumenter.start + transport.start + instrumenter.start config.enabled_spies.each do |lib| require "elastic_apm/spies/#{lib}" end self end def stop - @instrumenter.stop + debug 'Stopping agent' - kill_worker + instrumenter.stop + transport.stop self end at_exit do stop end - # queues + # transport - def enqueue_transaction(transaction) - boot_worker unless worker_running? - - pending_transactions.push(transaction) - - return unless should_flush_transactions? - - messages.push(Worker::FlushMsg.new) + def enqueue(obj) + transport.submit obj end - def should_flush_transactions? - return true unless config.flush_interval - return true if pending_transactions.length >= config.max_queue_size - - false - end - - def enqueue_error(error) - boot_worker unless worker_running? - - messages.push(Worker::ErrorMsg.new(error)) - end - # instrumentation def current_transaction instrumenter.current_transaction end - def transaction(name = nil, type = nil, context: nil, sampled: nil, &block) - instrumenter.transaction( + def current_span + instrumenter.current_span + end + + def start_transaction( + name = nil, + type = nil, + context: nil, + traceparent: nil + ) + instrumenter.start_transaction( name, type, context: context, - sampled: sampled, - &block + traceparent: traceparent ) end - def span(name, type = nil, backtrace: nil, context: nil, &block) - instrumenter.span( + def end_transaction(result = nil) + instrumenter.end_transaction(result) + end + + def start_span(name = nil, type = nil, backtrace: nil, context: nil) + instrumenter.start_span( name, type, backtrace: backtrace, - context: context, - &block + context: context ) end + def end_span + instrumenter.end_span + end + + def set_tag(key, value) + instrumenter.set_tag(key, value) + end + + def set_custom_context(context) + instrumenter.set_custom_context(context) + end + + def set_user(user) + instrumenter.set_user(user) + end + def build_context(rack_env) @context_builder.build(rack_env) end # errors @@ -163,70 +157,25 @@ error = @error_builder.build_exception( exception, handled: handled ) - enqueue_error error + enqueue error end def report_message(message, backtrace: nil, **attrs) error = @error_builder.build_log( message, backtrace: backtrace, **attrs ) - enqueue_error error + enqueue error end - # context + # filters - def set_tag(key, value) - instrumenter.set_tag(key, value) - end - - def set_custom_context(context) - instrumenter.set_custom_context(context) - end - - def set_user(user) - instrumenter.set_user(user) - end - def add_filter(key, callback) - @http.filters.add(key, callback) - end - - def inspect - '<ElasticAPM::Agent>' - end - - private - - def boot_worker - debug 'Booting worker' - - @worker_thread = Thread.new do - Worker.new( - config, - messages, - pending_transactions, - http - ).run_forever - end - end - - def kill_worker - messages << Worker::StopMsg.new - - if @worker_thread && !@worker_thread.join(5) # 5 secs - raise 'Failed to wait for worker, not all messages sent' - end - - @worker_thread = nil - end - - def worker_running? - @worker_thread && @worker_thread.alive? + transport.add_filter(key, callback) end end # rubocop:enable Metrics/ClassLength end