lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.8.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.9.0

- old
+ new

@@ -11,20 +11,23 @@ module Trace module Export # Implementation of the duck type SpanProcessor that batches spans # exported by the SDK then pushes them to the exporter pipeline. # + # Typically, the BatchSpanProcessor will be more suitable for + # production environments than the SimpleSpanProcessor. + # # All spans reported by the SDK implementation are first added to a # synchronized queue (with a {max_queue_size} maximum size, after the # size is reached spans are dropped) and exported every # schedule_delay_millis to the exporter pipeline in batches of # max_export_batch_size. # # If the queue gets half full a preemptive notification is sent to the # worker thread that exports the spans to wake up and start a new # export cycle. - class BatchSpanProcessor + class BatchSpanProcessor # rubocop:disable Metrics/ClassLength # Returns a new instance of the {BatchSpanProcessor}. # # @param [SpanExporter] exporter # @param [Numeric] exporter_timeout_millis the delay interval between two # consecutive exports. Defaults to the value of the OTEL_BSP_EXPORT_TIMEOUT_MILLIS @@ -42,83 +45,99 @@ # @return a new instance of the {BatchSpanProcessor}. def initialize(exporter:, exporter_timeout_millis: Float(ENV.fetch('OTEL_BSP_EXPORT_TIMEOUT_MILLIS', 30_000)), schedule_delay_millis: Float(ENV.fetch('OTEL_BSP_SCHEDULE_DELAY_MILLIS', 5_000)), max_queue_size: Integer(ENV.fetch('OTEL_BSP_MAX_QUEUE_SIZE', 2048)), - max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512))) + max_export_batch_size: Integer(ENV.fetch('OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 512)), + start_thread_on_boot: String(ENV['OTEL_RUBY_BSP_START_THREAD_ON_BOOT']) !~ /false/i) raise ArgumentError if max_export_batch_size > max_queue_size @exporter = exporter @exporter_timeout_seconds = exporter_timeout_millis / 1000.0 @mutex = Mutex.new + @export_mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @spans = [] @pid = nil @thread = nil - reset_on_fork + reset_on_fork(restart_thread: start_thread_on_boot) end - # does nothing for this processor - def on_start(span, parent_context) - # noop - end + # Does nothing for this processor + def on_start(_span, _parent_context); end - # adds a span to the batcher, threadsafe may block on lock + # Adds a span to the batch. Thread-safe; may block on lock. def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do reset_on_fork n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span - @condition.signal if spans.size > max_queue_size / 2 + @condition.signal if spans.size > batch_size end end - # TODO: test this explicitly. # Export all ended spans to the configured `Exporter` that have not yet # been exported. # # This method should only be called in cases where it is absolutely # necessary, such as when using some FaaS providers that may suspend # the process after an invocation, but before the `Processor` exports # the completed spans. # + # @param [optional Numeric] timeout An optional timeout in seconds. # @return [Integer] SUCCESS if no error occurred, FAILURE if a # non-specific failure occurred, TIMEOUT if a timeout occurred. - def force_flush + def force_flush(timeout: nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + start_time = Time.now snapshot = lock do reset_on_fork(restart_thread: false) if @keep_running spans.shift(spans.size) end until snapshot.empty? + remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time) + return TIMEOUT if remaining_timeout&.zero? + batch = snapshot.shift(@batch_size).map!(&:to_span_data) - result_code = @exporter.export(batch) - report_result(result_code, batch) + result_code = export_batch(batch, timeout: remaining_timeout) + return result_code unless result_code == SUCCESS end + SUCCESS + ensure + # Unshift the remaining spans if we timed out. We drop excess spans from + # the snapshot because they're older than any spans in the spans buffer. + lock do + n = spans.size + snapshot.size - max_queue_size + snapshot.shift(n) if n.positive? + spans.unshift(snapshot) unless snapshot.empty? + @condition.signal if spans.size > max_queue_size / 2 + end end - # shuts the consumer thread down and flushes the current accumulated buffer - # will block until the thread is finished + # Shuts the consumer thread down and flushes the current accumulated buffer + # will block until the thread is finished. # + # @param [optional Numeric] timeout An optional timeout in seconds. # @return [Integer] SUCCESS if no error occurred, FAILURE if a # non-specific failure occurred, TIMEOUT if a timeout occurred. - def shutdown + def shutdown(timeout: nil) + start_time = Time.now lock do @keep_running = false @condition.signal end - @thread.join - force_flush - @exporter.shutdown + @thread.join(timeout) + force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) + @exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) end private attr_reader :spans, :max_queue_size, :batch_size @@ -145,18 +164,13 @@ @pid = pid spans.clear @thread = Thread.new { work } if restart_thread end - def export_batch(batch) - result_code = export_with_timeout(batch) + def export_batch(batch, timeout: @exporter_timeout_seconds) + result_code = @export_mutex.synchronize { @exporter.export(batch, timeout: timeout) } report_result(result_code, batch) - end - - def export_with_timeout(batch) - Timeout.timeout(@exporter_timeout_seconds) { @exporter.export(batch) } - rescue Timeout::Error - FAILURE + result_code end def report_result(result_code, batch) OpenTelemetry.logger.error("Unable to export #{batch.size} spans") unless result_code == SUCCESS end