lib/ddtrace/workers.rb in ddtrace-0.21.2 vs lib/ddtrace/workers.rb in ddtrace-0.22.0

- old
+ new

@@ -1,8 +1,9 @@ require 'time' require 'ddtrace/buffer' +require 'ddtrace/runtime/metrics' module Datadog module Workers # Asynchronous worker that executes a +Send()+ operation after given # seconds. Under the hood, it uses +Concurrent::TimerTask+ so that the thread @@ -12,35 +13,47 @@ DEFAULT_TIMEOUT = 5 BACK_OFF_RATIO = 1.2 BACK_OFF_MAX = 5 SHUTDOWN_TIMEOUT = 1 - attr_reader :trace_buffer, :service_buffer + attr_reader \ + :service_buffer, + :trace_buffer - def initialize(transport, buff_size, trace_task, service_task, interval) - @trace_task = trace_task - @service_task = service_task + def initialize(options = {}) + @transport = options[:transport] + + # Callbacks + @trace_task = options[:on_trace] + @service_task = options[:on_service] + @runtime_metrics_task = options[:on_runtime_metrics] + + # Intervals + interval = options.fetch(:interval, 1) @flush_interval = interval @back_off = interval - @trace_buffer = TraceBuffer.new(buff_size) - @service_buffer = TraceBuffer.new(buff_size) - @transport = transport + + # Buffers + buffer_size = options.fetch(:buffer_size, 100) + @trace_buffer = TraceBuffer.new(buffer_size) + @service_buffer = TraceBuffer.new(buffer_size) + + # Threading @shutdown = ConditionVariable.new @mutex = Mutex.new - @worker = nil @run = false end # Callback function that process traces and executes the +send_traces()+ method. def callback_traces return true if @trace_buffer.empty? begin - traces = @trace_buffer.pop() + traces = @trace_buffer.pop traces = Pipeline.process!(traces) - @trace_task.call(traces, @transport) + @trace_task.call(traces, @transport) unless @trace_task.nil? rescue StandardError => e # ensures that the thread will not die because of an exception. # TODO[manu]: findout the reason and reschedule the send if it's not # a fatal exception Datadog::Tracer.log.error("Error during traces flush: dropped #{traces.length} items. Cause: #{e}") @@ -51,19 +64,25 @@ def callback_services return true if @service_buffer.empty? begin services = @service_buffer.pop() - @service_task.call(services[0], @transport) + @service_task.call(services[0], @transport) unless @service_task.nil? rescue StandardError => e # ensures that the thread will not die because of an exception. # TODO[manu]: findout the reason and reschedule the send if it's not # a fatal exception Datadog::Tracer.log.error("Error during services flush: dropped #{services.length} items. Cause: #{e}") end end + def callback_runtime_metrics + @runtime_metrics_task.call unless @runtime_metrics_task.nil? + rescue StandardError => e + Datadog::Tracer.log.error("Error during runtime metrics flush. Cause: #{e}") + end + # Start the timer execution. def start @mutex.synchronize do return if @run @run = true @@ -111,9 +130,10 @@ def perform loop do @back_off = flush_data ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min callback_services + callback_runtime_metrics @mutex.synchronize do return if !@run && @trace_buffer.empty? && @service_buffer.empty? @shutdown.wait(@mutex, @back_off) if @run # do not wait when shutting down end