lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.4.2 vs lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.4.3
- old
+ new
@@ -8,10 +8,17 @@
module Instrumentation
module Rdkafka
module Patches
# The Consumer module contains the instrumentation patch for the Consumer class
module Consumer
+ GETTER = if Gem::Version.new(::Rdkafka::VERSION) >= Gem::Version.new('0.13.0')
+ Context::Propagation.text_map_getter
+ else
+ OpenTelemetry::Common::Propagation.symbol_key_getter
+ end
+ private_constant :GETTER
+
def each
super do |message|
attributes = {
'messaging.system' => 'kafka',
'messaging.destination' => message.topic,
@@ -21,11 +28,11 @@
}
message_key = extract_message_key(message.key)
attributes['messaging.kafka.message_key'] = message_key if message_key
- parent_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter)
+ parent_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?
OpenTelemetry::Context.with_current(parent_context) do
tracer.in_span("#{message.topic} process", links: links, attributes: attributes, kind: :consumer) do
@@ -45,10 +52,10 @@
'messaging.destination_kind' => 'topic',
'messaging.kafka.message_count' => messages.size
}
links = messages.map do |message|
- trace_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter)
+ trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
span_context = OpenTelemetry::Trace.current_span(trace_context).context
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
end
links.compact!