lib/ddtrace/workers.rb in ddtrace-0.9.2 vs lib/ddtrace/workers.rb in ddtrace-0.10.0
- old
+ new
@@ -10,22 +10,24 @@
# with the +stop()+ method and can start with the +start()+ method.
class AsyncTransport
DEFAULT_TIMEOUT = 5
BACK_OFF_RATIO = 1.2
BACK_OFF_MAX = 5
+ SHUTDOWN_TIMEOUT = 1
- attr_reader :trace_buffer, :service_buffer, :shutting_down
+ attr_reader :trace_buffer, :service_buffer
def initialize(transport, buff_size, trace_task, service_task, interval)
@trace_task = trace_task
@service_task = service_task
@flush_interval = interval
@back_off = interval
@trace_buffer = TraceBuffer.new(buff_size)
@service_buffer = TraceBuffer.new(buff_size)
@transport = transport
- @shutting_down = false
+ @shutdown = ConditionVariable.new
+ @mutex = Mutex.new
@worker = nil
@run = false
end
@@ -60,51 +62,36 @@
end
end
# Start the timer execution.
def start
- return if @run
- @run = true
- @worker = Thread.new() do
- Datadog::Tracer.log.debug("Starting thread in the process: #{Process.pid}")
-
- while @run
- @back_off = callback_traces ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min
-
- callback_services
-
- sleep(@back_off) if @run
- end
+ @mutex.synchronize do
+ return if @run
+ @run = true
+ Tracer.log.debug("Starting thread in the process: #{Process.pid}")
+ @worker = Thread.new { perform }
end
end
- # Stop the timer execution. Tasks already in the queue will be executed.
+ # Closes all available queues and waits for the trace and service buffer to flush
def stop
- @run = false
- end
+ @mutex.synchronize do
+ return unless @run
- # Closes all available queues and waits for the trace and service buffer to flush
- def shutdown!
- return false if @shutting_down
- @shutting_down = true
- @trace_buffer.close
- @service_buffer.close
- sleep(0.1)
- timeout_time = Time.now + DEFAULT_TIMEOUT
- while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time
- sleep(0.05)
- Datadog::Tracer.log.debug('Waiting for the buffers to clear before exiting')
+ @trace_buffer.close
+ @service_buffer.close
+ @run = false
+ @shutdown.signal
end
- stop
+
join
- @shutting_down = false
true
end
# Block until executor shutdown is complete or until timeout seconds have passed.
def join
- @worker.join(5)
+ @worker.join(SHUTDOWN_TIMEOUT)
end
# Enqueue an item in the trace internal buffer. This operation is thread-safe
# because uses the +TraceBuffer+ data structure.
def enqueue_trace(trace)
@@ -113,9 +100,24 @@
# Enqueue an item in the service internal buffer. This operation is thread-safe.
def enqueue_service(service)
return if service == {} # no use to send this, not worth it
@service_buffer.push(service)
+ end
+
+ private
+
+ def perform
+ loop do
+ @back_off = callback_traces ? @flush_interval : [@back_off * BACK_OFF_RATIO, BACK_OFF_MAX].min
+
+ callback_services
+
+ @mutex.synchronize do
+ return if !@run && @trace_buffer.empty? && @service_buffer.empty?
+ @shutdown.wait(@mutex, @back_off)
+ end
+ end
end
end
end
end