Sha256: 805eb5247a9f2bbd62545684d744fef40f0e2b0dbc671ca7afd98bde5d75a2aa

Contents?: true

Size: 1.95 KB

Versions: 10

Compression:

Stored size: 1.95 KB

Contents

# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
  module Instrumentation
    module RubyKafka
      module Patches
        # The Client module contains the instrumentation patch the Client#deliver_message and Client#each_message methods.
        module Client
          def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1)
            attributes = {
              'messaging.system' => 'kafka',
              'messaging.destination' => topic,
              'messaging.destination_kind' => 'topic'
            }

            attributes['messaging.kafka.message_key'] = key if key
            attributes['messaging.kafka.partition'] = partition if partition

            tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do
              OpenTelemetry.propagation.inject(headers)
              super
            end
          end

          def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block)
            super do |message|
              attributes = {
                'messaging.system' => 'kafka',
                'messaging.destination' => message.topic,
                'messaging.destination_kind' => 'topic',
                'messaging.kafka.partition' => message.partition
              }

              attributes['messaging.kafka.message_key'] = message.key if message.key

              parent_context = OpenTelemetry.propagation.extract(message.headers)
              OpenTelemetry::Context.with_current(parent_context) do
                tracer.in_span("#{topic} process", attributes: attributes, kind: :consumer) do
                  yield message
                end
              end
            end
          end

          private

          def tracer
            RubyKafka::Instrumentation.instance.tracer
          end
        end
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
opentelemetry-instrumentation-ruby_kafka-0.18.5 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.18.4 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.18.3 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.18.2 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.18.1 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.18.0 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.17.0 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.16.0 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.15.0 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
opentelemetry-instrumentation-ruby_kafka-0.14.0 lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb