lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb in opentelemetry-instrumentation-ruby_kafka-0.18.5 vs lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb in opentelemetry-instrumentation-ruby_kafka-0.18.6
- old
+ new
@@ -2,10 +2,12 @@
# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
+require_relative '../utils'
+
module OpenTelemetry
module Instrumentation
module RubyKafka
module Patches
# The Client module contains the instrumentation patch the Client#deliver_message and Client#each_message methods.
@@ -15,11 +17,13 @@
'messaging.system' => 'kafka',
'messaging.destination' => topic,
'messaging.destination_kind' => 'topic'
}
- attributes['messaging.kafka.message_key'] = key if key
+ message_key = Utils.extract_message_key(key)
+ attributes['messaging.kafka.message_key'] = message_key if message_key
+
attributes['messaging.kafka.partition'] = partition if partition
tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do
OpenTelemetry.propagation.inject(headers)
super
@@ -33,10 +37,11 @@
'messaging.destination' => message.topic,
'messaging.destination_kind' => 'topic',
'messaging.kafka.partition' => message.partition
}
- attributes['messaging.kafka.message_key'] = message.key if message.key
+ message_key = Utils.extract_message_key(message.key)
+ attributes['messaging.kafka.message_key'] = message_key if message_key
parent_context = OpenTelemetry.propagation.extract(message.headers)
OpenTelemetry::Context.with_current(parent_context) do
tracer.in_span("#{topic} process", attributes: attributes, kind: :consumer) do
yield message