lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.10.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.11.0
- old
+ new
@@ -1,8 +1,8 @@
# frozen_string_literal: true
-# Copyright 2019 OpenTelemetry Authors
+# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
require 'timeout'
@@ -46,11 +46,12 @@
def initialize(exporter:,
exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)),
schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)),
max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)),
max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)),
- start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i)
+ start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i,
+ metrics_reporter: nil)
raise ArgumentError if max_export_batch_size > max_queue_size
@exporter = exporter
@exporter_timeout_seconds = exporter_timeout_millis / 1000.0
@mutex = Mutex.new
@@ -58,10 +59,11 @@
@condition = ConditionVariable.new
@keep_running = true
@delay_seconds = schedule_delay_millis / 1000.0
@max_queue_size = max_queue_size
@batch_size = max_export_batch_size
+ @metrics_reporter = metrics_reporter || OpenTelemetry::SDK::Trace::Export::MetricsReporter
@spans = []
@pid = nil
@thread = nil
reset_on_fork(restart_thread: start_thread_on_boot)
end
@@ -75,10 +77,11 @@
lock do
reset_on_fork
n = spans.size + 1 - max_queue_size
spans.shift(n) if n.positive?
+ report_dropped_spans(n, reason: 'buffer-full')
spans << span
@condition.signal if spans.size > batch_size
end
end
@@ -113,10 +116,11 @@
# Unshift the remaining spans if we timed out. We drop excess spans from
# the snapshot because they're older than any spans in the spans buffer.
lock do
n = spans.size + snapshot.size - max_queue_size
snapshot.shift(n) if n.positive?
+ report_dropped_spans(n, reason: 'buffer-full')
spans.unshift(snapshot) unless snapshot.empty?
@condition.signal if spans.size > max_queue_size / 2
end
end
@@ -134,27 +138,31 @@
end
@thread.join(timeout)
force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
@exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
+ dropped_spans = lock { spans.size }
+ report_dropped_spans(dropped_spans, reason: 'terminating') if dropped_spans.positive?
end
private
attr_reader :spans, :max_queue_size, :batch_size
- def work
+ def work # rubocop:disable Metrics/AbcSize
loop do
batch = lock do
reset_on_fork(restart_thread: false)
@condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
@condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
return unless @keep_running
fetch_batch
end
+ @metrics_reporter.observe_value('otel.bsp.buffer_utilization', value: spans.size / max_queue_size.to_f)
+
export_batch(batch)
end
end
def reset_on_fork(restart_thread: true)
@@ -171,10 +179,21 @@
report_result(result_code, batch)
result_code
end
def report_result(result_code, batch)
- OpenTelemetry.logger.error("Unable to export #{batch.size} spans") unless result_code == SUCCESS
+ if result_code == SUCCESS
+ @metrics_reporter.add_to_counter('otel.bsp.export.success')
+ @metrics_reporter.add_to_counter('otel.bsp.exported_spans', increment: batch.size)
+ else
+ OpenTelemetry.logger.error("Unable to export #{batch.size} spans")
+ @metrics_reporter.add_to_counter('otel.bsp.export.failure')
+ report_dropped_spans(batch.size, reason: 'export-failure')
+ end
+ end
+
+ def report_dropped_spans(count, reason:)
+ @metrics_reporter.add_to_counter('otel.bsp.dropped_spans', increment: count, labels: { 'reason' => reason })
end
def fetch_batch
spans.shift(@batch_size).map!(&:to_span_data)
end