lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb in opentelemetry-instrumentation-ruby_kafka-0.18.5 vs lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb in opentelemetry-instrumentation-ruby_kafka-0.18.6

- old
+ new

@@ -2,10 +2,12 @@ # Copyright The OpenTelemetry Authors # # SPDX-License-Identifier: Apache-2.0 +require_relative '../utils' + module OpenTelemetry module Instrumentation module RubyKafka module Patches # The Client module contains the instrumentation patch the Client#deliver_message and Client#each_message methods. @@ -15,11 +17,13 @@ 'messaging.system' => 'kafka', 'messaging.destination' => topic, 'messaging.destination_kind' => 'topic' } - attributes['messaging.kafka.message_key'] = key if key + message_key = Utils.extract_message_key(key) + attributes['messaging.kafka.message_key'] = message_key if message_key + attributes['messaging.kafka.partition'] = partition if partition tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do OpenTelemetry.propagation.inject(headers) super @@ -33,10 +37,11 @@ 'messaging.destination' => message.topic, 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => message.partition } - attributes['messaging.kafka.message_key'] = message.key if message.key + message_key = Utils.extract_message_key(message.key) + attributes['messaging.kafka.message_key'] = message_key if message_key parent_context = OpenTelemetry.propagation.extract(message.headers) OpenTelemetry::Context.with_current(parent_context) do tracer.in_span("#{topic} process", attributes: attributes, kind: :consumer) do yield message