lib/ddtrace/workers.rb in ddtrace-0.9.2 vs lib/ddtrace/workers.rb in ddtrace-0.10.0

- old
+ new

@@ -10,22 +10,24 @@ # with the +stop()+ method and can start with the +start()+ method. class AsyncTransport DEFAULT_TIMEOUT = 5 BACK_OFF_RATIO = 1.2 BACK_OFF_MAX = 5 + SHUTDOWN_TIMEOUT = 1 - attr_reader :trace_buffer, :service_buffer, :shutting_down + attr_reader :trace_buffer, :service_buffer def initialize(transport, buff_size, trace_task, service_task, interval) @trace_task = trace_task @service_task = service_task @flush_interval = interval @back_off = interval @trace_buffer = TraceBuffer.new(buff_size) @service_buffer = TraceBuffer.new(buff_size) @transport = transport - @shutting_down = false + @shutdown = ConditionVariable.new + @mutex = Mutex.new @worker = nil @run = false end @@ -60,51 +62,36 @@ end end # Start the timer execution. def start - return if @run - @run = true - @worker = Thread.new() do - Datadog::Tracer.log.debug("Starting thread in the process: #{Process.pid}") - - while @run - @back_off = callback_traces ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min - - callback_services - - sleep(@back_off) if @run - end + @mutex.synchronize do + return if @run + @run = true + Tracer.log.debug("Starting thread in the process: #{Process.pid}") + @worker = Thread.new { perform } end end - # Stop the timer execution. Tasks already in the queue will be executed. + # Closes all available queues and waits for the trace and service buffer to flush def stop - @run = false - end + @mutex.synchronize do + return unless @run - # Closes all available queues and waits for the trace and service buffer to flush - def shutdown! - return false if @shutting_down - @shutting_down = true - @trace_buffer.close - @service_buffer.close - sleep(0.1) - timeout_time = Time.now + DEFAULT_TIMEOUT - while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time - sleep(0.05) - Datadog::Tracer.log.debug('Waiting for the buffers to clear before exiting') + @trace_buffer.close + @service_buffer.close + @run = false + @shutdown.signal end - stop + join - @shutting_down = false true end # Block until executor shutdown is complete or until timeout seconds have passed. def join - @worker.join(5) + @worker.join(SHUTDOWN_TIMEOUT) end # Enqueue an item in the trace internal buffer. This operation is thread-safe # because uses the +TraceBuffer+ data structure. def enqueue_trace(trace) @@ -113,9 +100,24 @@ # Enqueue an item in the service internal buffer. This operation is thread-safe. def enqueue_service(service) return if service == {} # no use to send this, not worth it @service_buffer.push(service) + end + + private + + def perform + loop do + @back_off = callback_traces ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min + + callback_services + + @mutex.synchronize do + return if !@run && @trace_buffer.empty? && @service_buffer.empty? + @shutdown.wait(@mutex, @back_off) + end + end end end end end