lib/instana/tracing/processor.rb in instana-0.12.1 vs lib/instana/tracing/processor.rb in instana-0.13.1
- old
+ new
@@ -2,21 +2,66 @@
module Instana
class Processor
def initialize
+ # The main queue before being reported to the
+ # host agent. Traces in this queue are complete
+ # and ready to be sent.
@queue = Queue.new
+
+ # The staging queue that holds traces that have completed
+ # but still have outstanding async spans.
+ # Traces that have been in this queue for more than
+ # 5 minutes are discarded.
+ @staging_queue = Set.new
+
+ # No access to the @staging_queue until this lock
+ # is taken.
+ @staging_lock = Mutex.new
end
# Adds a trace to the queue to be processed and
# sent to the host agent
#
# @param [Trace] the trace to be added to the queue
def add(trace)
+ ::Instana.logger.trace("Queuing completed trace id: #{trace.id}")
@queue.push(trace)
end
+ # Adds a trace to the staging queue.
+ #
+ # @param [Trace] the trace to be added to the queue
+ def stage(trace)
+ ::Instana.logger.trace("Staging incomplete trace id: #{trace.id}")
+ @staging_queue.add(trace)
+ end
+
+ # This will run through the staged traces (if any) to find
+ # completed or timed out incompleted traces. Completed traces will
+ # be added to the main @queue. Timed out traces will be discarded
+ #
+ def process_staged
+ @staging_lock.synchronize {
+ if @staging_queue.size > 0
+ @staging_queue.delete_if do |t|
+ if t.complete?
+ ::Instana.logger.trace("Moving staged complete trace to main queue: #{t.id}")
+ add(t)
+ true
+ elsif t.discard?
+ ::Instana.logger.debug("Discarding trace with uncompleted async spans over 5 mins old. id: #{t.id}")
+ true
+ else
+ false
+ end
+ end
+ end
+ }
+ end
+
##
# send
#
# Sends all traces in @queue to the host
# agent
@@ -25,32 +70,35 @@
# - Max HTTP Post size
# - Out of control/growing queue
# - Prevent another run of the timer while this is running
#
def send
- return if @queue.empty?
+ return if @queue.empty? || ENV['INSTANA_GEM_TEST']
size = @queue.size
if size > 100
Instana.logger.debug "Trace queue is #{size}"
end
- ::Instana.agent.report_spans(queued_spans)
- end
+ # Scan for any staged but incomplete traces that have now
+ # completed.
+ process_staged
- # Get the number traces currently in the queue
- #
- def queue_count
- @queue.size
+ # Retrieve all spans for queued traces
+ spans = queued_spans
+
+ ::Instana.agent.report_spans(spans)
end
# Retrieves all of the traces in @queue and returns
# the sum of their raw spans.
# This is used by Processor::send and in the test suite.
# Note that traces retrieved with this method are removed
# entirely from the queue.
#
+ # @return [Array] An array of [Span] or empty
+ #
def queued_spans
return [] if @queue.empty?
spans = []
until @queue.empty? do
@@ -65,10 +113,12 @@
# Retrieves all of the traces that are in @queue.
# Note that traces retrieved with this method are removed
# entirely from the queue.
#
+ # @return [Array] An array of [Trace] or empty
+ #
def queued_traces
return [] if @queue.empty?
traces = []
until @queue.empty? do
@@ -76,18 +126,71 @@
traces << @queue.pop(true) rescue nil
end
traces
end
- # Removes all traces from the @queue. Used in the
- # test suite.
+ # Retrieves a all staged traces from the staging queue. Staged traces
+ # are traces that have completed but may have outstanding
+ # asynchronous spans.
#
- def clear!
- return [] if @queue.empty?
+ # @return [Array]
+ #
+ def staged_traces
+ traces = nil
+ @staging_lock.synchronize {
+ traces = @staging_queue.to_a
+ @staging_queue.clear
+ }
+ traces
+ end
+ # Retrieves a single staged trace from the staging queue. Staged traces
+ # are traces that have completed but may have outstanding
+ # asynchronous spans.
+ #
+ # @param ids [Hash] the Trace ID and Span ID in the form of
+ # :trace_id => 12345
+ # :span_id => 12345
+ #
+ def staged_trace(ids)
+ candidate = nil
+ @staging_lock.synchronize {
+ @staging_queue.each do |trace|
+ if trace.id == ids[:trace_id]
+ candidate = trace
+ end
+ end
+ }
+ unless candidate
+ ::Instana.logger.trace("Couldn't find staged trace with trace_id: #{ids[:trace_id]}")
+ end
+ candidate
+ end
+
+ # Get the number traces currently in the queue
+ #
+ # @return [Integer] the queue size
+ #
+ def queue_count
+ @queue.size
+ end
+
+ # Get the number traces currently in the staging queue
+ #
+ # @return [Integer] the queue size
+ #
+ def staged_count
+ @staging_queue.size
+ end
+
+ # Removes all traces from the @queue and @staging_queue. Used in the
+ # test suite to reset state.
+ #
+ def clear!
until @queue.empty? do
# Non-blocking pop; ignore exception
@queue.pop(true) rescue nil
end
+ @staging_queue.clear
end
end
end