lib/ddtrace/workers.rb in ddtrace-0.21.2 vs lib/ddtrace/workers.rb in ddtrace-0.22.0
- old
+ new
@@ -1,8 +1,9 @@
require 'time'
require 'ddtrace/buffer'
+require 'ddtrace/runtime/metrics'
module Datadog
module Workers
# Asynchronous worker that executes a +Send()+ operation after given
# seconds. Under the hood, it uses +Concurrent::TimerTask+ so that the thread
@@ -12,35 +13,47 @@
DEFAULT_TIMEOUT = 5
BACK_OFF_RATIO = 1.2
BACK_OFF_MAX = 5
SHUTDOWN_TIMEOUT = 1
- attr_reader :trace_buffer, :service_buffer
+ attr_reader \
+ :service_buffer,
+ :trace_buffer
- def initialize(transport, buff_size, trace_task, service_task, interval)
- @trace_task = trace_task
- @service_task = service_task
+ def initialize(options = {})
+ @transport = options[:transport]
+
+ # Callbacks
+ @trace_task = options[:on_trace]
+ @service_task = options[:on_service]
+ @runtime_metrics_task = options[:on_runtime_metrics]
+
+ # Intervals
+ interval = options.fetch(:interval, 1)
@flush_interval = interval
@back_off = interval
- @trace_buffer = TraceBuffer.new(buff_size)
- @service_buffer = TraceBuffer.new(buff_size)
- @transport = transport
+
+ # Buffers
+ buffer_size = options.fetch(:buffer_size, 100)
+ @trace_buffer = TraceBuffer.new(buffer_size)
+ @service_buffer = TraceBuffer.new(buffer_size)
+
+ # Threading
@shutdown = ConditionVariable.new
@mutex = Mutex.new
-
@worker = nil
@run = false
end
# Callback function that process traces and executes the +send_traces()+ method.
def callback_traces
return true if @trace_buffer.empty?
begin
- traces = @trace_buffer.pop()
+ traces = @trace_buffer.pop
traces = Pipeline.process!(traces)
- @trace_task.call(traces, @transport)
+ @trace_task.call(traces, @transport) unless @trace_task.nil?
rescue StandardError => e
# ensures that the thread will not die because of an exception.
# TODO[manu]: findout the reason and reschedule the send if it's not
# a fatal exception
Datadog::Tracer.log.error("Error during traces flush: dropped #{traces.length} items. Cause: #{e}")
@@ -51,19 +64,25 @@
def callback_services
return true if @service_buffer.empty?
begin
services = @service_buffer.pop()
- @service_task.call(services[0], @transport)
+ @service_task.call(services[0], @transport) unless @service_task.nil?
rescue StandardError => e
# ensures that the thread will not die because of an exception.
# TODO[manu]: findout the reason and reschedule the send if it's not
# a fatal exception
Datadog::Tracer.log.error("Error during services flush: dropped #{services.length} items. Cause: #{e}")
end
end
+ def callback_runtime_metrics
+ @runtime_metrics_task.call unless @runtime_metrics_task.nil?
+ rescue StandardError => e
+ Datadog::Tracer.log.error("Error during runtime metrics flush. Cause: #{e}")
+ end
+
# Start the timer execution.
def start
@mutex.synchronize do
return if @run
@run = true
@@ -111,9 +130,10 @@
def perform
loop do
@back_off = flush_data ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min
callback_services
+ callback_runtime_metrics
@mutex.synchronize do
return if !@run && @trace_buffer.empty? && @service_buffer.empty?
@shutdown.wait(@mutex, @back_off) if @run # do not wait when shutting down
end