lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.8.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.9.0
- old
+ new
@@ -11,20 +11,23 @@
module Trace
module Export
# Implementation of the duck type SpanProcessor that batches spans
# exported by the SDK then pushes them to the exporter pipeline.
#
+ # Typically, the BatchSpanProcessor will be more suitable for
+ # production environments than the SimpleSpanProcessor.
+ #
# All spans reported by the SDK implementation are first added to a
# synchronized queue (with a {max_queue_size} maximum size, after the
# size is reached spans are dropped) and exported every
# schedule_delay_millis to the exporter pipeline in batches of
# max_export_batch_size.
#
# If the queue gets half full a preemptive notification is sent to the
# worker thread that exports the spans to wake up and start a new
# export cycle.
- class BatchSpanProcessor
+ class BatchSpanProcessor # rubocop:disable Metrics/ClassLength
# Returns a new instance of the {BatchSpanProcessor}.
#
# @param [SpanExporter] exporter
# @param [Numeric] exporter_timeout_millis the delay interval between two
# consecutive exports. Defaults to the value of the OTEL_BSP_EXPORT_TIMEOUT_MILLIS
@@ -42,83 +45,99 @@
# @return a new instance of the {BatchSpanProcessor}.
def initialize(exporter:,
exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)),
schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)),
max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)),
- max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)))
+ max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)),
+ start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i)
raise ArgumentError if max_export_batch_size > max_queue_size
@exporter = exporter
@exporter_timeout_seconds = exporter_timeout_millis / 1000.0
@mutex = Mutex.new
+ @export_mutex = Mutex.new
@condition = ConditionVariable.new
@keep_running = true
@delay_seconds = schedule_delay_millis / 1000.0
@max_queue_size = max_queue_size
@batch_size = max_export_batch_size
@spans = []
@pid = nil
@thread = nil
- reset_on_fork
+ reset_on_fork(restart_thread: start_thread_on_boot)
end
- # does nothing for this processor
- def on_start(span, parent_context)
- # noop
- end
+ # Does nothing for this processor
+ def on_start(_span, _parent_context); end
- # adds a span to the batcher, threadsafe may block on lock
+ # Adds a span to the batch. Thread-safe; may block on lock.
def on_finish(span) # rubocop:disable Metrics/AbcSize
return unless span.context.trace_flags.sampled?
lock do
reset_on_fork
n = spans.size + 1 - max_queue_size
spans.shift(n) if n.positive?
spans << span
- @condition.signal if spans.size > max_queue_size / 2
+ @condition.signal if spans.size > batch_size
end
end
- # TODO: test this explicitly.
# Export all ended spans to the configured `Exporter` that have not yet
# been exported.
#
# This method should only be called in cases where it is absolutely
# necessary, such as when using some FaaS providers that may suspend
# the process after an invocation, but before the `Processor` exports
# the completed spans.
#
+ # @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
# non-specific failure occurred, TIMEOUT if a timeout occurred.
- def force_flush
+ def force_flush(timeout: nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
+ start_time = Time.now
snapshot = lock do
reset_on_fork(restart_thread: false) if @keep_running
spans.shift(spans.size)
end
until snapshot.empty?
+ remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
+ return TIMEOUT if remaining_timeout&.zero?
+
batch = snapshot.shift(@batch_size).map!(&:to_span_data)
- result_code = @exporter.export(batch)
- report_result(result_code, batch)
+ result_code = export_batch(batch, timeout: remaining_timeout)
+ return result_code unless result_code == SUCCESS
end
+
SUCCESS
+ ensure
+ # Unshift the remaining spans if we timed out. We drop excess spans from
+ # the snapshot because they're older than any spans in the spans buffer.
+ lock do
+ n = spans.size + snapshot.size - max_queue_size
+ snapshot.shift(n) if n.positive?
+ spans.unshift(snapshot) unless snapshot.empty?
+ @condition.signal if spans.size > max_queue_size / 2
+ end
end
- # shuts the consumer thread down and flushes the current accumulated buffer
- # will block until the thread is finished
+ # Shuts the consumer thread down and flushes the current accumulated buffer
+ # will block until the thread is finished.
#
+ # @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
# non-specific failure occurred, TIMEOUT if a timeout occurred.
- def shutdown
+ def shutdown(timeout: nil)
+ start_time = Time.now
lock do
@keep_running = false
@condition.signal
end
- @thread.join
- force_flush
- @exporter.shutdown
+ @thread.join(timeout)
+ force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
+ @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
end
private
attr_reader :spans, :max_queue_size, :batch_size
@@ -145,18 +164,13 @@
@pid = pid
spans.clear
@thread = Thread.new { work } if restart_thread
end
- def export_batch(batch)
- result_code = export_with_timeout(batch)
+ def export_batch(batch, timeout: @exporter_timeout_seconds)
+ result_code = @export_mutex.synchronize { @exporter.export(batch, timeout: timeout) }
report_result(result_code, batch)
- end
-
- def export_with_timeout(batch)
- Timeout.timeout(@exporter_timeout_seconds) { @exporter.export(batch) }
- rescue Timeout::Error
- FAILURE
+ result_code
end
def report_result(result_code, batch)
OpenTelemetry.logger.error("Unable to export #{batch.size} spans") unless result_code == SUCCESS
end