Sha256: 3027025498b632aa655e824be376c88c0d91b77feab55afa2a93d78b181a16e9
Contents?: true
Size: 1.4 KB
Versions: 1
Compression:
Stored size: 1.4 KB
Contents
# frozen_string_literal: true # Copyright The OpenTelemetry Authors # # SPDX-License-Identifier: Apache-2.0 module OpenTelemetry module Instrumentation module RubyKafka module Patches # The Producer module contains the instrumentation patch the Producer#produce method module Producer def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => topic, 'messaging.destination_kind' => 'topic' } # If trace context is present in headers, extract and use it as parent. If there is _no_ trace context key # in the headers, OpenTelemetry.propagation.extract will return an unmodified copy of the the current # Thread's context, so the next two lines preserve the correct Thread-local context. ctx = OpenTelemetry.propagation.extract(headers) OpenTelemetry::Context.with_current(ctx) do tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do OpenTelemetry.propagation.inject(headers) super end end end private def tracer RubyKafka::Instrumentation.instance.tracer end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
opentelemetry-instrumentation-ruby_kafka-0.20.2 | lib/opentelemetry/instrumentation/ruby_kafka/patches/producer.rb |