# frozen_string_literal: true require_relative '../core/environment/ext' require_relative '../core/environment/socket' require_relative 'correlation' require_relative 'event' require_relative 'flush' require_relative 'context_provider' require_relative 'sampling/all_sampler' require_relative 'sampling/rule_sampler' require_relative 'sampling/priority_sampler' require_relative 'sampling/span/sampler' require_relative 'span_operation' require_relative 'trace_digest' require_relative 'trace_operation' require_relative 'writer' module Datadog module Tracing # A {Datadog::Tracing::Tracer} keeps track of the time spent by an application processing a single operation. For # example, a trace can be used to track the entire time spent processing a complicated web request. # Even though the request may require multiple resources and machines to handle the request, all # of these function calls and sub-requests would be encapsulated within a single trace. class Tracer attr_reader \ :trace_flush, :provider, :sampler, :span_sampler, :tags attr_accessor \ :default_service, :enabled, :writer # Initialize a new {Datadog::Tracing::Tracer} used to create, sample and submit spans that measure the # time of sections of code. # # @param trace_flush [Datadog::Tracing::TraceFlush] responsible for flushing spans from the execution context # @param context_provider [Datadog::Tracing::DefaultContextProvider] ensures different # execution contexts have distinct traces # @param default_service [String] A fallback value for {Datadog::Tracing::Span#service}, as spans without # service are rejected # @param enabled [Boolean] set if the tracer submits or not spans to the local agent # @param sampler [Datadog::Tracing::Sampler] a tracer sampler, responsible for filtering out spans when needed # @param tags [Hash] default tags added to all spans # @param writer [Datadog::Tracing::Writer] consumes traces returned by the provided +trace_flush+ def initialize( trace_flush: Flush::Finished.new, context_provider: DefaultContextProvider.new, default_service: Core::Environment::Ext::FALLBACK_SERVICE_NAME, enabled: true, sampler: Sampling::PrioritySampler.new( base_sampler: Sampling::AllSampler.new, post_sampler: Sampling::RuleSampler.new ), span_sampler: Sampling::Span::Sampler.new, tags: {}, writer: Writer.new ) @trace_flush = trace_flush @default_service = default_service @enabled = enabled @provider = context_provider @sampler = sampler @span_sampler = span_sampler @tags = tags @writer = writer end # Return a {Datadog::Tracing::SpanOperation span_op} and {Datadog::Tracing::TraceOperation trace_op} # that will trace an operation called `name`. # # You could trace your code using a do-block like: # # ``` # tracer.trace('web.request') do |span_op, trace_op| # span_op.service = 'my-web-site' # span_op.resource = '/' # span_op.set_tag('http.method', request.request_method) # do_something() # end # ``` # # The {#trace} method can also be used without a block in this way: # ``` # span_op = tracer.trace('web.request', service: 'my-web-site') # do_something() # span_op.finish() # ``` # # Remember that in this case, calling {Datadog::Tracing::SpanOperation#finish} is mandatory. # # When a Trace is started, {#trace} will store the created span; subsequent spans will # become its children and will inherit some properties: # ``` # parent = tracer.trace('parent') # has no parent span # child = tracer.trace('child') # is a child of 'parent' # child.finish() # parent.finish() # parent2 = tracer.trace('parent2') # has no parent span # parent2.finish() # ``` # # @param [String] name {Datadog::Tracing::Span} operation name. # See {https://docs.datadoghq.com/tracing/guide/configuring-primary-operation/ Primary Operations in Services}. # @param [Datadog::Tracing::TraceDigest] continue_from continue a trace from a {Datadog::Tracing::TraceDigest}. # Used for linking traces that are executed asynchronously. # @param [Proc] on_error a block that overrides error handling behavior for this operation. # @param [String] resource the resource this span refers, or `name` if it's missing # @param [String] service the service name for this span. # @param [Time] start_time time which the span should have started. # @param [Hash] tags extra tags which should be added to the span. # @param [String] type the type of the span. See {Datadog::Tracing::Metadata::Ext::AppTypes}. # @param [Integer] the id of the new span. # @return [Object] If a block is provided, returns the result of the block execution. # @return [Datadog::Tracing::SpanOperation] If no block is provided, returns the active, # unfinished {Datadog::Tracing::SpanOperation}. # @yield Optional block where new newly created {Datadog::Tracing::SpanOperation} captures the execution. # @yieldparam [Datadog::Tracing::SpanOperation] span_op the newly created and active [Datadog::Tracing::SpanOperation] # @yieldparam [Datadog::Tracing::TraceOperation] trace_op the active [Datadog::Tracing::TraceOperation] # rubocop:disable Metrics/MethodLength def trace( name, continue_from: nil, on_error: nil, resource: nil, service: nil, start_time: nil, tags: nil, type: nil, id: nil, &block ) return skip_trace(name, &block) unless enabled # Resolve the trace begin context = call_context active_trace = context.active_trace trace = if continue_from || active_trace.nil? start_trace(continue_from: continue_from) else active_trace end rescue StandardError => e Datadog.logger.debug { "Failed to trace: #{e}" } # Tracing failed: fallback and run code without tracing. return skip_trace(name, &block) end # Activate and start the trace if block context.activate!(trace) do start_span( name, on_error: on_error, resource: resource, service: service, start_time: start_time, tags: tags, type: type, _trace: trace, id: id, &block ) end else # Setup trace activation/deactivation manual_trace_activation!(context, trace) # Return the new span start_span( name, on_error: on_error, resource: resource, service: service, start_time: start_time, tags: tags, type: type, _trace: trace, id: id ) end end # rubocop:enable Metrics/MethodLength # Set the given key / value tag pair at the tracer level. These tags will be # appended to each span created by the tracer. Keys and values must be strings. # @example # tracer.set_tags('env' => 'prod', 'component' => 'core') def set_tags(tags) string_tags = tags.collect { |k, v| [k.to_s, v] }.to_h @tags = @tags.merge(string_tags) end # The active, unfinished trace, representing the current instrumentation context. # # The active trace is fiber-local. # # @param [Thread] key Thread to retrieve trace from. Defaults to current thread. For internal use only. # @return [Datadog::Tracing::TraceSegment] the active trace # @return [nil] if no trace is active def active_trace(key = nil) call_context(key).active_trace end # The active, unfinished span, representing the currently instrumented application section. # # The active span belongs to an {.active_trace}. # # @param [Thread] key Thread to retrieve trace from. Defaults to current thread. For internal use only. # @return [Datadog::Tracing::SpanOperation] the active span # @return [nil] if no trace is active, and thus no span is active def active_span(key = nil) trace = active_trace(key) trace.active_span if trace end # Information about the currently active trace. # # The most common use cases are tagging log messages and metrics. # # @param [Thread] key Thread to retrieve trace from. Defaults to current thread. For internal use only. # @return [Datadog::Tracing::Correlation::Identifier] correlation object def active_correlation(key = nil) trace = active_trace(key) return Datadog::Tracing::Correlation::Identifier.new unless trace trace.to_correlation end # Setup a new trace to continue from where another # trace left off. # # Used to continue distributed or async traces. # # @param [Datadog::Tracing::TraceDigest] digest continue from the {Datadog::Tracing::TraceDigest}. # @param [Thread] key Thread to retrieve trace from. Defaults to current thread. For internal use only. # @return [Object] If a block is provided, the result of the block execution. # @return [Datadog::Tracing::TraceOperation] If no block, the active {Datadog::Tracing::TraceOperation}. # @yield Optional block where this {#continue_trace!} `digest` scope is active. # If no block, the `digest` remains active after {#continue_trace!} returns. def continue_trace!(digest, key = nil, &block) # Only accept {TraceDigest} as a digest. # Otherwise, create a new execution context. digest = nil unless digest.is_a?(TraceDigest) # Start a new trace from the digest context = call_context(key) original_trace = active_trace(key) trace = start_trace(continue_from: digest) # If block hasn't been given; we need to manually deactivate # this trace. Subscribe to the trace finished event to do this. subscribe_trace_deactivation!(context, trace, original_trace) unless block context.activate!(trace, &block) end # Sample a span, tagging the trace as appropriate. def sample_trace(trace_op) begin @sampler.sample!(trace_op) rescue StandardError => e SAMPLE_TRACE_LOG_ONLY_ONCE.run do Datadog.logger.warn { "Failed to sample trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" } end end end # @!visibility private # TODO: make this private def trace_completed @trace_completed ||= TraceCompleted.new end # Triggered whenever a trace is completed class TraceCompleted < Tracing::Event def initialize super(:trace_completed) end # NOTE: Ignore Rubocop rule. This definition allows for # description of and constraints on arguments. # rubocop:disable Lint/UselessMethodDefinition def publish(trace) super(trace) end # rubocop:enable Lint/UselessMethodDefinition end # Shorthand that calls the `shutdown!` method of a registered worker. # It's useful to ensure that the Trace Buffer is properly flushed before # shutting down the application. # # @example # tracer.trace('operation_name', service='rake_tasks') do |span_op| # span_op.set_tag('task.name', 'script') # end # # tracer.shutdown! def shutdown! return unless @enabled @writer.stop if @writer end private # Return the current active {Context} for this traced execution. This method is # automatically called when calling Tracer.trace or Tracer.start_span, # but it can be used in the application code during manual instrumentation. # # This method makes use of a {ContextProvider} that is automatically set during the tracer # initialization, or while using a library instrumentation. # # @param [Thread] key Thread to retrieve tracer from. Defaults to current thread. def call_context(key = nil) @provider.context(key) end def build_trace(digest = nil) # Resolve hostname if configured hostname = Core::Environment::Socket.hostname if Datadog.configuration.tracing.report_hostname hostname = hostname && !hostname.empty? ? hostname : nil if digest TraceOperation.new( hostname: hostname, profiling_enabled: profiling_enabled, id: digest.trace_id, origin: digest.trace_origin, parent_span_id: digest.span_id, sampling_priority: digest.trace_sampling_priority, # Distributed tags are just regular trace tags with special meaning to Datadog tags: digest.trace_distributed_tags, trace_state: digest.trace_state, trace_state_unknown_fields: digest.trace_state_unknown_fields, remote_parent: digest.span_remote, tracer: self ) else TraceOperation.new( hostname: hostname, profiling_enabled: profiling_enabled, remote_parent: false, tracer: self ) end end def bind_trace_events!(trace_op) events = trace_op.send(:events) events.span_before_start.subscribe do |event_span_op, event_trace_op| event_trace_op.service ||= @default_service event_span_op.service ||= @default_service end events.span_finished.subscribe do |event_span, event_trace_op| sample_span(event_trace_op, event_span) flush_trace(event_trace_op) end end # Creates a new TraceOperation, with events bounds to this Tracer instance. # @return [TraceOperation] def start_trace(continue_from: nil) # Build a new trace using digest if provided. trace = build_trace(continue_from) # Bind trace events: sample trace, set default service, flush spans. bind_trace_events!(trace) trace end # rubocop:disable Lint/UnderscorePrefixedVariableName def start_span( name, continue_from: nil, on_error: nil, resource: nil, service: nil, start_time: nil, tags: nil, type: nil, _trace: nil, id: nil, &block ) trace = _trace || start_trace(continue_from: continue_from) events = SpanOperation::Events.new if block # Ignore start time if a block has been given trace.measure( name, events: events, on_error: on_error, resource: resource, service: service, tags: resolve_tags(tags, service), type: type, id: id, &block ) else # Return the new span span = trace.build_span( name, events: events, on_error: on_error, resource: resource, service: service, start_time: start_time, tags: resolve_tags(tags, service), type: type, id: id ) span.start(start_time) span end end # rubocop:enable Lint/UnderscorePrefixedVariableName def resolve_tags(tags, service) merged_tags = if @tags.any? && tags # Combine default tags with provided tags, # preferring provided tags. @tags.merge(tags) else # Use provided tags or default tags if none. tags || @tags.dup end # Remove version tag if service is not the default service if merged_tags.key?(Core::Environment::Ext::TAG_VERSION) && service && service != @default_service merged_tags.delete(Core::Environment::Ext::TAG_VERSION) end merged_tags end # Manually activate and deactivate the trace, when the span completes. def manual_trace_activation!(context, trace) # Get the original trace to restore original_trace = context.active_trace # Setup the deactivation callback subscribe_trace_deactivation!(context, trace, original_trace) # Activate the trace # Skip this, if it would have no effect. context.activate!(trace) unless trace == original_trace end # Reactivate the original trace when trace completes def subscribe_trace_deactivation!(context, trace, original_trace) # Don't override this event if it's set. # The original event should reactivate the original trace correctly. # # This happens when multiple manually-activation spans are created: # ```ruby # tracer.trace('parent') do # span1 = tracer.trace('first') # Registers trace deactivation back to `parent` span. # span2 = tracer.trace('second') # Tries to register trace deactivation back to `first`, which is not correct. # end # ``` return if trace.send(:events).trace_finished.deactivate_trace_subscribed? trace.send(:events).trace_finished.subscribe_deactivate_trace do context.activate!(original_trace) end end SAMPLE_TRACE_LOG_ONLY_ONCE = Core::Utils::OnlyOnce.new private_constant :SAMPLE_TRACE_LOG_ONLY_ONCE def sample_span(trace_op, span) begin @span_sampler.sample!(trace_op, span) rescue StandardError => e SAMPLE_SPAN_LOG_ONLY_ONCE.run do Datadog.logger.warn { "Failed to sample span: #{e.class.name} #{e} at #{Array(e.backtrace).first}" } end end end SAMPLE_SPAN_LOG_ONLY_ONCE = Core::Utils::OnlyOnce.new private_constant :SAMPLE_SPAN_LOG_ONLY_ONCE # Flush finished spans from the trace buffer, send them to writer. def flush_trace(trace_op) sample_trace(trace_op) unless trace_op.sampling_priority begin trace = @trace_flush.consume!(trace_op) write(trace) if trace && !trace.empty? rescue StandardError => e FLUSH_TRACE_LOG_ONLY_ONCE.run do Datadog.logger.warn { "Failed to flush trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" } end end end FLUSH_TRACE_LOG_ONLY_ONCE = Core::Utils::OnlyOnce.new private_constant :FLUSH_TRACE_LOG_ONLY_ONCE # Send the trace to the writer to enqueue the spans list in the agent # sending queue. def write(trace) return unless trace && @writer if Datadog.configuration.diagnostics.debug Datadog.logger.debug { "Writing #{trace.length} spans (enabled: #{@enabled})\n#{trace.spans.pretty_inspect}" } end @writer.write(trace) trace_completed.publish(trace) end # TODO: Make these dummy objects singletons to preserve memory. def skip_trace(name) span = SpanOperation.new(name) if block_given? trace = TraceOperation.new yield(span, trace) else span end end def profiling_enabled @profiling_enabled ||= !!(defined?(Datadog::Profiling) && Datadog::Profiling.respond_to?(:enabled?) && Datadog::Profiling.enabled?) end end end end