lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.2.2 vs lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb in opentelemetry-instrumentation-rdkafka-0.2.3
- old
+ new
@@ -18,11 +18,13 @@
'messaging.destination_kind' => 'topic',
'messaging.kafka.partition' => message.partition,
'messaging.kafka.offset' => message.offset
}
- attributes['messaging.kafka.message_key'] = message.key if message.key
+ message_key = extract_message_key(message.key)
+ attributes['messaging.kafka.message_key'] = message_key if message_key
+
parent_context = OpenTelemetry.propagation.extract(message.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter)
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?
OpenTelemetry::Context.with_current(parent_context) do
@@ -60,9 +62,18 @@
private
def tracer
Rdkafka::Instrumentation.instance.tracer
+ end
+
+ def extract_message_key(key)
+ # skip encode if already valid utf8
+ return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?)
+
+ key.encode(Encoding::UTF_8)
+ rescue Encoding::UndefinedConversionError
+ nil
end
end
end
end
end