lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.10.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.11.0

- old
+ new

@@ -1,8 +1,8 @@ # frozen_string_literal: true -# Copyright 2019 OpenTelemetry Authors +# Copyright The OpenTelemetry Authors # # SPDX-License-Identifier: Apache-2.0 require 'timeout' @@ -46,11 +46,12 @@ def initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), - start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i) + start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i, + metrics_reporter: nil) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new @@ -58,10 +59,11 @@ @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size + @metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter @spans = [] @pid = nil @thread = nil reset_on_fork(restart_thread: start_thread_on_boot) end @@ -75,10 +77,11 @@ lock do reset_on_fork n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? + report_dropped_spans(n, reason: 'buffer-full') spans << span @condition.signal if spans.size > batch_size end end @@ -113,10 +116,11 @@ # Unshift the remaining spans if we timed out. We drop excess spans from # the snapshot because they're older than any spans in the spans buffer. lock do n = spans.size + snapshot.size - max_queue_size snapshot.shift(n) if n.positive? + report_dropped_spans(n, reason: 'buffer-full') spans.unshift(snapshot) unless snapshot.empty? @condition.signal if spans.size > max_queue_size / 2 end end @@ -134,27 +138,31 @@ end @thread.join(timeout) force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) + dropped_spans = lock { spans.size } + report_dropped_spans(dropped_spans, reason: 'terminating') if dropped_spans.positive? end private attr_reader :spans, :max_queue_size, :batch_size - def work + def work # rubocop:disable Metrics/AbcSize loop do batch = lock do reset_on_fork(restart_thread: false) @condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running @condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running return unless @keep_running fetch_batch end + @metrics_reporter.observe_value('otel.bsp.buffer_utilization', value: spans.size / max_queue_size.to_f) + export_batch(batch) end end def reset_on_fork(restart_thread: true) @@ -171,10 +179,21 @@ report_result(result_code, batch) result_code end def report_result(result_code, batch) - OpenTelemetry.logger.error("Unable to export #{batch.size} spans") unless result_code == SUCCESS + if result_code == SUCCESS + @metrics_reporter.add_to_counter('otel.bsp.export.success') + @metrics_reporter.add_to_counter('otel.bsp.exported_spans', increment: batch.size) + else + OpenTelemetry.logger.error("Unable to export #{batch.size} spans") + @metrics_reporter.add_to_counter('otel.bsp.export.failure') + report_dropped_spans(batch.size, reason: 'export-failure') + end + end + + def report_dropped_spans(count, reason:) + @metrics_reporter.add_to_counter('otel.bsp.dropped_spans', increment: count, labels: { 'reason' => reason }) end def fetch_batch spans.shift(@batch_size).map!(&:to_span_data) end