lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-1.3.2 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-1.4.0

- old
+ new

@@ -108,11 +108,11 @@ end until snapshot.empty? remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time) return TIMEOUT if remaining_timeout&.zero? - batch = snapshot.shift(@batch_size).map!(&:to_span_data) + batch = snapshot.shift(@batch_size) result_code = export_batch(batch, timeout: remaining_timeout) return result_code unless result_code == SUCCESS end @exporter.force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)) @@ -123,11 +123,11 @@ n = spans.size + snapshot.size - max_queue_size if n.positive? dropped_spans = snapshot.shift(n) report_dropped_spans(dropped_spans, reason: 'buffer-full', function: __method__.to_s) end - spans.unshift(snapshot) unless snapshot.empty? + spans.unshift(*snapshot) unless snapshot.empty? @condition.signal if spans.size > max_queue_size / 2 end end # Shuts the consumer thread down and flushes the current accumulated buffer @@ -160,11 +160,11 @@ batch = lock do @condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running @condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running return unless @keep_running - fetch_batch + spans.shift(@batch_size) end @metrics_reporter.observe_value('otel.bsp.buffer_utilization', value: spans.size / max_queue_size.to_f) export_batch(batch) @@ -181,36 +181,33 @@ rescue ThreadError => e @metrics_reporter.add_to_counter('otel.bsp.error', labels: { 'reason' => 'ThreadError' }) OpenTelemetry.handle_error(exception: e, message: 'unexpected error in BatchSpanProcessor#reset_on_fork') end - def export_batch(batch, timeout: @exporter_timeout_seconds) + def export_batch(span_array, timeout: @exporter_timeout_seconds) + batch = span_array.map(&:to_span_data) result_code = @export_mutex.synchronize { @exporter.export(batch, timeout: timeout) } - report_result(result_code, batch) + report_result(result_code, span_array) result_code rescue StandardError => e - report_result(FAILURE, batch) + report_result(FAILURE, span_array) @metrics_reporter.add_to_counter('otel.bsp.error', labels: { 'reason' => e.class.to_s }) FAILURE end - def report_result(result_code, batch) + def report_result(result_code, span_array) if result_code == SUCCESS @metrics_reporter.add_to_counter('otel.bsp.export.success') - @metrics_reporter.add_to_counter('otel.bsp.exported_spans', increment: batch.size) + @metrics_reporter.add_to_counter('otel.bsp.exported_spans', increment: span_array.size) else - OpenTelemetry.handle_error(exception: ExportError.new("Unable to export #{batch.size} spans")) + OpenTelemetry.handle_error(exception: ExportError.new(span_array)) @metrics_reporter.add_to_counter('otel.bsp.export.failure') - report_dropped_spans(batch, reason: 'export-failure') + report_dropped_spans(span_array, reason: 'export-failure') end end def report_dropped_spans(dropped_spans, reason:, function: nil) @metrics_reporter.add_to_counter('otel.bsp.dropped_spans', increment: dropped_spans.size, labels: { 'reason' => reason, OpenTelemetry::SemanticConventions::Trace::CODE_FUNCTION => function }.compact) - end - - def fetch_batch - spans.shift(@batch_size).map!(&:to_span_data) end def lock @mutex.synchronize do yield