lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb in opentelemetry-instrumentation-ruby_kafka-0.12.0 vs lib/opentelemetry/instrumentation/ruby_kafka/patches/consumer.rb in opentelemetry-instrumentation-ruby_kafka-0.13.0
- old
+ new
@@ -20,15 +20,17 @@
'messaging.kafka.offset' => message.offset
}
attributes['messaging.kafka.message_key'] = message.key if message.key
- span_context = OpenTelemetry::Trace.current_span(OpenTelemetry.propagation.text.extract(message.headers)).context
- link = OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
- links = [link] if link
+ parent_context = OpenTelemetry.propagation.text.extract(message.headers)
+ span_context = OpenTelemetry::Trace.current_span(parent_context).context
+ links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?
- tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do
- yield message
+ OpenTelemetry::Context.with_current(parent_context) do
+ tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do
+ yield message
+ end
end
end
end
def each_batch(min_bytes: 1, max_bytes: 10_485_760, max_wait_time: 1, automatically_mark_as_processed: true) # rubocop:disable Metrics/AbcSize