lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb in opentelemetry-instrumentation-racecar-0.1.1 vs lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb in opentelemetry-instrumentation-racecar-0.1.2
- old
+ new
@@ -33,11 +33,13 @@
'messaging.destination_kind' => 'topic',
'messaging.kafka.partition' => payload[:partition],
'messaging.kafka.offset' => payload[:offset]
}
- attributes['messaging.kafka.message_key'] = payload[:key] if payload[:key]
+ message_key = extract_message_key(payload[:key])
+ attributes['messaging.kafka.message_key'] = message_key if message_key
+
attributes
end
def finish(name, id, payload)
span = payload.delete(:__opentelemetry_span)
@@ -51,9 +53,18 @@
end
span.finish
OpenTelemetry::Context.detach(token)
OpenTelemetry::Context.detach(parent_token)
+ end
+
+ def extract_message_key(key)
+ # skip encode if already valid utf8
+ return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?)
+
+ key.encode(Encoding::UTF_8)
+ rescue Encoding::UndefinedConversionError
+ nil
end
end
end
end