Sha256: e0dd5134e8954c3aff736fef2e3f96addee7673721d80ed6a89a0ad3ad47c0aa
Contents?: true
Size: 1.41 KB
Versions: 4
Compression:
Stored size: 1.41 KB
Contents
# frozen_string_literal: true # Copyright The OpenTelemetry Authors # # SPDX-License-Identifier: Apache-2.0 module OpenTelemetry module Instrumentation module RubyKafka module Patches # The Producer module contains the instrumentation patch the Producer#produce method module Producer def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => topic, 'messaging.destination_kind' => 'topic' } # If trace context is present in headers, extract and use it as parent. If there is _no_ trace context key # in the headers, OpenTelemetry.propagation.extract will return an unmodified copy of the the current # Thread's context, so the next two lines preserve the correct Thread-local context. ctx = OpenTelemetry.propagation.extract(headers) OpenTelemetry::Context.with_current(ctx) do tracer.in_span("#{topic} publish", attributes: attributes, kind: :producer) do OpenTelemetry.propagation.inject(headers) super end end end private def tracer RubyKafka::Instrumentation.instance.tracer end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems