lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb in opentelemetry-instrumentation-ruby_kafka-0.12.0 vs lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb in opentelemetry-instrumentation-ruby_kafka-0.13.0

- old
+ new

@@ -20,15 +20,17 @@ 'messaging.kafka.offset' => message.offset } attributes['messaging.kafka.message_key'] = message.key if message.key - span_context = OpenTelemetry::Trace.current_span(OpenTelemetry.propagation.text.extract(message.headers)).context - link = OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? - links = [link] if link + parent_context = OpenTelemetry.propagation.text.extract(message.headers) + span_context = OpenTelemetry::Trace.current_span(parent_context).context + links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? - tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do - yield message + OpenTelemetry::Context.with_current(parent_context) do + tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do + yield message + end end end end def each_batch(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) # rubocop:disable Metrics/AbcSize