lib/instana/tracing/processor.rb in instana-1.9.7 vs lib/instana/tracing/processor.rb in instana-1.10.0.slimfast

- old
+ new

@@ -1,99 +1,55 @@ require 'thread' module Instana class Processor - def initialize # The main queue before being reported to the - # host agent. Traces in this queue are complete + # host agent. Spans in this queue are complete # and ready to be sent. @queue = Queue.new - # The staging queue that holds traces that have completed - # but still have outstanding async spans. - # Traces that have been in this queue for more than - # 5 minutes are discarded. - @staging_queue = Set.new - - # No access to the @staging_queue until this lock - # is taken. - @staging_lock = Mutex.new - # This is the maximum number of spans we send to the host # agent at once. @batch_size = 3000 end - # Adds a trace to the queue to be processed and - # sent to the host agent + # Adds a Set of spans to the queue # - # @param [Trace] the trace to be added to the queue - def add(trace) + # @param [spans] - the trace to be added to the queue + def add_spans(spans) # Do a quick checkup on our background thread. if ::Instana.agent.collect_thread.nil? || !::Instana.agent.collect_thread.alive? ::Instana.agent.spawn_background_thread end - - # ::Instana.logger.debug("Queuing completed trace id: #{trace.id}") - @queue.push(trace) + spans.each { |span| @queue.push(span)} end - # Adds a trace to the staging queue. + # Adds a span to the span queue # - # @param [Trace] the trace to be added to the queue - def stage(trace) - ::Instana.logger.debug("Staging incomplete trace id: #{trace.id}") - @staging_queue.add(trace) + # @param [Trace] - the trace to be added to the queue + def add_span(span) + # Do a quick checkup on our background thread. + if ::Instana.agent.collect_thread.nil? || !::Instana.agent.collect_thread.alive? + ::Instana.agent.spawn_background_thread + end + @queue.push(span) end - # This will run through the staged traces (if any) to find - # completed or timed out incompleted traces. Completed traces will - # be added to the main @queue. Timed out traces will be discarded - # - def process_staged - @staging_lock.synchronize { - if @staging_queue.size > 0 - @staging_queue.delete_if do |t| - if t.complete? - ::Instana.logger.debug("Moving staged complete trace to main queue: #{t.id}") - add(t) - true - elsif t.discard? - ::Instana.logger.debug("Discarding trace with uncompleted async spans over 5 mins old. id: #{t.id}") - true - else - false - end - end - end - } - end - ## # send # - # Sends all traces in @queue to the host - # agent + # Sends all traces in @queue to the host agent # # FIXME: Add limits checking here in regards to: # - Max HTTP Post size # - Out of control/growing queue # - Prevent another run of the timer while this is running # def send return if @queue.empty? || ENV.key?('INSTANA_TEST') - size = @queue.size - if size > 200 - Instana.logger.debug "Trace queue is #{size}" - end - - # Scan for any staged but incomplete traces that have now - # completed. - process_staged - # Retrieve all spans for queued traces spans = queued_spans # Report spans in batches batch = spans.shift(@batch_size) @@ -115,95 +71,24 @@ return [] if @queue.empty? spans = [] until @queue.empty? do # Non-blocking pop; ignore exception - trace = @queue.pop(true) rescue nil - trace.spans.each do |s| - spans << s.raw + span = @queue.pop(true) rescue nil + if span + spans << span.raw end end spans end - # Retrieves all of the traces that are in @queue. - # Note that traces retrieved with this method are removed - # entirely from the queue. - # - # @return [Array] An array of [Trace] or empty - # - def queued_traces - return [] if @queue.empty? - - traces = [] - until @queue.empty? do - # Non-blocking pop; ignore exception - traces << @queue.pop(true) rescue nil - end - traces - end - - # Retrieves a all staged traces from the staging queue. Staged traces - # are traces that have completed but may have outstanding - # asynchronous spans. - # - # @return [Array] - # - def staged_traces - traces = nil - @staging_lock.synchronize { - traces = @staging_queue.to_a - @staging_queue.clear - } - traces - end - - # Retrieves a single staged trace from the staging queue. Staged traces - # are traces that have completed but may have outstanding - # asynchronous spans. - # - # @param trace_id [Integer] the Trace ID to be searched for - # - def staged_trace(trace_id) - candidate = nil - @staging_lock.synchronize { - @staging_queue.each do |trace| - if trace.id == trace_id - candidate = trace - break - end - end - } - unless candidate - ::Instana.logger.debug("Couldn't find staged trace with trace_id: #{trace_id}") - end - candidate - end - - # Get the number traces currently in the queue - # - # @return [Integer] the queue size - # - def queue_count - @queue.size - end - - # Get the number traces currently in the staging queue - # - # @return [Integer] the queue size - # - def staged_count - @staging_queue.size - end - - # Removes all traces from the @queue and @staging_queue. Used in the + # Removes all traces from the @queue. Used in the # test suite to reset state. # def clear! until @queue.empty? do # Non-blocking pop; ignore exception @queue.pop(true) rescue nil end - @staging_queue.clear end end end