Sha256: a6eb5f74a77f8bc9522a86c203819f7f841d6a19e5f927a6dfb7e427d74e5e22

Contents?: true

Size: 840 Bytes

Versions: 5

Compression:

Stored size: 840 Bytes

Contents

# frozen_string_literal: true

module OpenTelemetry
  module Instrumentation
    module RubyKafka
      module Patches
        # The AsyncProducer module contains the instrumentation patch the AsyncProducer#produce method
        module AsyncProducer
          def produce(value, topic:, **options)
            options = __otel_merge_options!(**options)
            super
          end

          def __otel_merge_options!(**options)
            options ||= { headers: {} }
            # The propagator mutates the carrier (first positional argument), so we need to set headers to empty hash so
            # that there's something to mutate
            options[:headers] = {} unless options[:headers]
            OpenTelemetry.propagation.inject(options[:headers])
            options
          end
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
opentelemetry-instrumentation-ruby_kafka-0.21.3 lib/opentelemetry/instrumentation/ruby_kafka/patches/async_producer.rb
opentelemetry-instrumentation-ruby_kafka-0.21.2 lib/opentelemetry/instrumentation/ruby_kafka/patches/async_producer.rb
opentelemetry-instrumentation-ruby_kafka-0.21.1 lib/opentelemetry/instrumentation/ruby_kafka/patches/async_producer.rb
opentelemetry-instrumentation-ruby_kafka-0.21.0 lib/opentelemetry/instrumentation/ruby_kafka/patches/async_producer.rb
opentelemetry-instrumentation-ruby_kafka-0.20.2 lib/opentelemetry/instrumentation/ruby_kafka/patches/async_producer.rb