Sha256: 2c48d89896c0c0345b46d0c48e25d46c40e5fbca86c97fef09d579248143b826

Contents?: true

Size: 1.4 KB

Versions: 10

Compression:

Stored size: 1.4 KB

Contents

# frozen_string_literal: true

module Sbmt
  module KafkaProducer
    module Instrumentation
      class OpenTelemetryTracer
        class << self
          def enabled?
            !!@enabled
          end

          attr_writer :enabled
        end

        def enabled?
          self.class.enabled?
        end

        def call(message)
          return message unless enabled?

          topic = message[:topic]
          attributes = {
            "messaging.system" => "kafka",
            "messaging.destination" => topic,
            "messaging.destination_kind" => "topic"
          }

          message_key = extract_message_key(message[:key])
          attributes["messaging.kafka.message_key"] = message_key if message_key

          message[:headers] ||= {}

          tracer.in_span("#{topic} publish", attributes: attributes, kind: :producer) do
            ::OpenTelemetry.propagation.inject(message[:headers])
          end

          message
        end

        private

        def tracer
          ::Sbmt::KafkaProducer::Instrumentation::OpenTelemetryLoader.instance.tracer
        end

        def extract_message_key(key)
          # skip encode if already valid utf8
          return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?)

          key.encode(Encoding::UTF_8)
        rescue Encoding::UndefinedConversionError
          nil
        end
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
sbmt-kafka_producer-3.2.0 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-3.1.1 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-3.1.0 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-3.0.0 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-2.2.3 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-2.2.2 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-2.2.1 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-2.2.0 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-2.1.0 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb
sbmt-kafka_producer-2.0.0 lib/sbmt/kafka_producer/instrumentation/open_telemetry_tracer.rb