lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.4.2 vs lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.4.3

- old
+ new

@@ -8,10 +8,17 @@ module Instrumentation module Rdkafka module Patches # The Consumer module contains the instrumentation patch for the Consumer class module Consumer + GETTER = if Gem::Version.new(::Rdkafka::VERSION) >= Gem::Version.new('0.13.0') + Context::Propagation.text_map_getter + else + OpenTelemetry::Common::Propagation.symbol_key_getter + end + private_constant :GETTER + def each super do |message| attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => message.topic, @@ -21,11 +28,11 @@ } message_key = extract_message_key(message.key) attributes['messaging.kafka.message_key'] = message_key if message_key - parent_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter) + parent_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER) span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? OpenTelemetry::Context.with_current(parent_context) do tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do @@ -45,10 +52,10 @@ 'messaging.destination_kind' => 'topic', 'messaging.kafka.message_count' => messages.size } links = messages.map do |message| - trace_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter) + trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER) span_context = OpenTelemetry::Trace.current_span(trace_context).context OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? end links.compact!