lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.2.2 vs lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.2.3

- old
+ new

@@ -18,11 +18,13 @@ 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => message.partition, 'messaging.kafka.offset' => message.offset } - attributes['messaging.kafka.message_key'] = message.key if message.key + message_key = extract_message_key(message.key) + attributes['messaging.kafka.message_key'] = message_key if message_key + parent_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter) span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? OpenTelemetry::Context.with_current(parent_context) do @@ -60,9 +62,18 @@ private def tracer Rdkafka::Instrumentation.instance.tracer + end + + def extract_message_key(key) + # skip encode if already valid utf8 + return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?) + + key.encode(Encoding::UTF_8) + rescue Encoding::UndefinedConversionError + nil end end end end end