lib/freddy/delivery.rb in freddy-1.7.0 vs lib/freddy/delivery.rb in freddy-2.0.0

- old
+ new

@@ -2,15 +2,16 @@ class Freddy class Delivery attr_reader :routing_key, :payload, :tag - def initialize(payload, metadata, routing_key, tag) + def initialize(payload, metadata, routing_key, tag, exchange) @payload = payload @metadata = metadata @routing_key = routing_key @tag = tag + @exchange = exchange end def correlation_id @metadata.correlation_id end @@ -21,22 +22,54 @@ def reply_to @metadata.reply_to end - def build_trace(operation_name, tags: {}, force_follows_from: false) - carrier = TraceCarrier.new(@metadata) - parent = OpenTracing.global_tracer.extract(OpenTracing::FORMAT_TEXT_MAP, carrier) + def in_span(force_follows_from: false, &block) + name = "#{@exchange}.#{@routing_key} receive" + kind = OpenTelemetry::Trace::SpanKind::CONSUMER + producer_context = OpenTelemetry.propagation.extract(@metadata[:headers] || {}) - references = - if !parent - [] - elsif force_follows_from - [OpenTracing::Reference.follows_from(parent)] - else - [OpenTracing::Reference.child_of(parent)] + if force_follows_from + producer_span_context = OpenTelemetry::Trace.current_span(producer_context).context + + links = [] + links << OpenTelemetry::Trace::Link.new(producer_span_context) if producer_span_context.valid? + + # In general we should start a new trace here and just link two traces + # together. But Zipkin (which we currently use) doesn't support links. + # So even though the root trace could finish before anything here + # starts executing, we'll continue with the root trace here as well. + OpenTelemetry::Context.with_current(producer_context) do + Freddy.tracer.in_span(name, attributes: span_attributes, links: links, kind: kind, &block) end + else + OpenTelemetry::Context.with_current(producer_context) do + Freddy.tracer.in_span(name, attributes: span_attributes, kind: kind, &block) + end + end + end - OpenTracing.start_active_span(operation_name, references: references, tags: tags) + private + + def span_attributes + destination_kind = @exchange == '' ? 'queue' : 'topic' + + attributes = { + 'payload.type' => (@payload[:type] || 'unknown').to_s, + OpenTelemetry::SemanticConventions::Trace::MESSAGING_SYSTEM => 'rabbitmq', + OpenTelemetry::SemanticConventions::Trace::MESSAGING_DESTINATION => @exchange, + OpenTelemetry::SemanticConventions::Trace::MESSAGING_DESTINATION_KIND => destination_kind, + OpenTelemetry::SemanticConventions::Trace::MESSAGING_RABBITMQ_ROUTING_KEY => @routing_key, + OpenTelemetry::SemanticConventions::Trace::MESSAGING_OPERATION => 'receive' + } + + # There's no correlation_id when a message was sent using + # `Freddy#deliver`. + if correlation_id + attributes[OpenTelemetry::SemanticConventions::Trace::MESSAGING_CONVERSATION_ID] = correlation_id + end + + attributes end end end