lib/elastic_apm/agent.rb in elastic-apm-1.1.0 vs lib/elastic_apm/agent.rb in elastic-apm-2.0.0
- old
+ new
@@ -3,45 +3,32 @@
require 'elastic_apm/naively_hashable'
require 'elastic_apm/context_builder'
require 'elastic_apm/error_builder'
require 'elastic_apm/stacktrace_builder'
require 'elastic_apm/error'
-require 'elastic_apm/http'
+require 'elastic_apm/transport/base'
require 'elastic_apm/spies'
-require 'elastic_apm/serializers'
-require 'elastic_apm/worker'
module ElasticAPM
# rubocop:disable Metrics/ClassLength
# @api private
class Agent
- include Log
+ include Logging
LOCK = Mutex.new
# life cycle
def self.instance # rubocop:disable Style/TrivialAccessors
@instance
end
- def self.start(config) # rubocop:disable Metrics/MethodLength
+ def self.start(config)
return @instance if @instance
config = Config.new(config) unless config.is_a?(Config)
- unless config.enabled_environments.include?(config.environment)
- unless config.disable_environment_warning?
- puts format(
- '%sNot tracking anything in "%s" env',
- Log::PREFIX, config.environment
- )
- end
-
- return
- end
-
LOCK.synchronize do
return @instance if @instance
@instance = new(config).start
end
@@ -60,100 +47,107 @@
!!@instance
end
def initialize(config)
@config = config
- @http = Http.new(config)
- @messages = Queue.new
- @pending_transactions = Queue.new
+ @transport = Transport::Base.new(config)
+ @instrumenter = Instrumenter.new(config) { |event| enqueue event }
- @instrumenter = Instrumenter.new(self)
-
+ @stacktrace_builder = StacktraceBuilder.new(config)
@context_builder = ContextBuilder.new(self)
@error_builder = ErrorBuilder.new(self)
- @stacktrace_builder = StacktraceBuilder.new(self)
end
- attr_reader :config, :messages, :pending_transactions, :instrumenter,
- :context_builder, :stacktrace_builder, :http
+ attr_reader :config, :transport, :instrumenter,
+ :stacktrace_builder, :context_builder, :error_builder
def start
- debug '[%s] Starting agent, reporting to %s', VERSION, config.server_url
+ info '[%s] Starting agent, reporting to %s', VERSION, config.server_url
- @instrumenter.start
+ transport.start
+ instrumenter.start
config.enabled_spies.each do |lib|
require "elastic_apm/spies/#{lib}"
end
self
end
def stop
- @instrumenter.stop
+ debug 'Stopping agent'
- kill_worker
+ instrumenter.stop
+ transport.stop
self
end
at_exit do
stop
end
- # queues
+ # transport
- def enqueue_transaction(transaction)
- boot_worker unless worker_running?
-
- pending_transactions.push(transaction)
-
- return unless should_flush_transactions?
-
- messages.push(Worker::FlushMsg.new)
+ def enqueue(obj)
+ transport.submit obj
end
- def should_flush_transactions?
- return true unless config.flush_interval
- return true if pending_transactions.length >= config.max_queue_size
-
- false
- end
-
- def enqueue_error(error)
- boot_worker unless worker_running?
-
- messages.push(Worker::ErrorMsg.new(error))
- end
-
# instrumentation
def current_transaction
instrumenter.current_transaction
end
- def transaction(name = nil, type = nil, context: nil, sampled: nil, &block)
- instrumenter.transaction(
+ def current_span
+ instrumenter.current_span
+ end
+
+ def start_transaction(
+ name = nil,
+ type = nil,
+ context: nil,
+ traceparent: nil
+ )
+ instrumenter.start_transaction(
name,
type,
context: context,
- sampled: sampled,
- &block
+ traceparent: traceparent
)
end
- def span(name, type = nil, backtrace: nil, context: nil, &block)
- instrumenter.span(
+ def end_transaction(result = nil)
+ instrumenter.end_transaction(result)
+ end
+
+ def start_span(name = nil, type = nil, backtrace: nil, context: nil)
+ instrumenter.start_span(
name,
type,
backtrace: backtrace,
- context: context,
- &block
+ context: context
)
end
+ def end_span
+ instrumenter.end_span
+ end
+
+ def set_tag(key, value)
+ instrumenter.set_tag(key, value)
+ end
+
+ def set_custom_context(context)
+ instrumenter.set_custom_context(context)
+ end
+
+ def set_user(user)
+ instrumenter.set_user(user)
+ end
+
def build_context(rack_env)
@context_builder.build(rack_env)
end
# errors
@@ -163,70 +157,25 @@
error = @error_builder.build_exception(
exception,
handled: handled
)
- enqueue_error error
+ enqueue error
end
def report_message(message, backtrace: nil, **attrs)
error = @error_builder.build_log(
message,
backtrace: backtrace,
**attrs
)
- enqueue_error error
+ enqueue error
end
- # context
+ # filters
- def set_tag(key, value)
- instrumenter.set_tag(key, value)
- end
-
- def set_custom_context(context)
- instrumenter.set_custom_context(context)
- end
-
- def set_user(user)
- instrumenter.set_user(user)
- end
-
def add_filter(key, callback)
- @http.filters.add(key, callback)
- end
-
- def inspect
- '<ElasticAPM::Agent>'
- end
-
- private
-
- def boot_worker
- debug 'Booting worker'
-
- @worker_thread = Thread.new do
- Worker.new(
- config,
- messages,
- pending_transactions,
- http
- ).run_forever
- end
- end
-
- def kill_worker
- messages << Worker::StopMsg.new
-
- if @worker_thread && !@worker_thread.join(5) # 5 secs
- raise 'Failed to wait for worker, not all messages sent'
- end
-
- @worker_thread = nil
- end
-
- def worker_running?
- @worker_thread && @worker_thread.alive?
+ transport.add_filter(key, callback)
end
end
# rubocop:enable Metrics/ClassLength
end