lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-1.3.2 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-1.4.0
- old
+ new
@@ -108,11 +108,11 @@
end
until snapshot.empty?
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
return TIMEOUT if remaining_timeout&.zero?
- batch = snapshot.shift(@batch_size).map!(&:to_span_data)
+ batch = snapshot.shift(@batch_size)
result_code = export_batch(batch, timeout: remaining_timeout)
return result_code unless result_code == SUCCESS
end
@exporter.force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
@@ -123,11 +123,11 @@
n = spans.size + snapshot.size - max_queue_size
if n.positive?
dropped_spans = snapshot.shift(n)
report_dropped_spans(dropped_spans, reason: 'buffer-full', function: __method__.to_s)
end
- spans.unshift(snapshot) unless snapshot.empty?
+ spans.unshift(*snapshot) unless snapshot.empty?
@condition.signal if spans.size > max_queue_size / 2
end
end
# Shuts the consumer thread down and flushes the current accumulated buffer
@@ -160,11 +160,11 @@
batch = lock do
@condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
@condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
return unless @keep_running
- fetch_batch
+ spans.shift(@batch_size)
end
@metrics_reporter.observe_value('otel.bsp.buffer_utilization', value: spans.size / max_queue_size.to_f)
export_batch(batch)
@@ -181,36 +181,33 @@
rescue ThreadError => e
@metrics_reporter.add_to_counter('otel.bsp.error', labels: { 'reason' => 'ThreadError' })
OpenTelemetry.handle_error(exception: e, message: 'unexpected error in BatchSpanProcessor#reset_on_fork')
end
- def export_batch(batch, timeout: @exporter_timeout_seconds)
+ def export_batch(span_array, timeout: @exporter_timeout_seconds)
+ batch = span_array.map(&:to_span_data)
result_code = @export_mutex.synchronize { @exporter.export(batch, timeout: timeout) }
- report_result(result_code, batch)
+ report_result(result_code, span_array)
result_code
rescue StandardError => e
- report_result(FAILURE, batch)
+ report_result(FAILURE, span_array)
@metrics_reporter.add_to_counter('otel.bsp.error', labels: { 'reason' => e.class.to_s })
FAILURE
end
- def report_result(result_code, batch)
+ def report_result(result_code, span_array)
if result_code == SUCCESS
@metrics_reporter.add_to_counter('otel.bsp.export.success')
- @metrics_reporter.add_to_counter('otel.bsp.exported_spans', increment: batch.size)
+ @metrics_reporter.add_to_counter('otel.bsp.exported_spans', increment: span_array.size)
else
- OpenTelemetry.handle_error(exception: ExportError.new("Unable to export #{batch.size} spans"))
+ OpenTelemetry.handle_error(exception: ExportError.new(span_array))
@metrics_reporter.add_to_counter('otel.bsp.export.failure')
- report_dropped_spans(batch, reason: 'export-failure')
+ report_dropped_spans(span_array, reason: 'export-failure')
end
end
def report_dropped_spans(dropped_spans, reason:, function: nil)
@metrics_reporter.add_to_counter('otel.bsp.dropped_spans', increment: dropped_spans.size, labels: { 'reason' => reason, OpenTelemetry::SemanticConventions::Trace::CODE_FUNCTION => function }.compact)
- end
-
- def fetch_batch
- spans.shift(@batch_size).map!(&:to_span_data)
end
def lock
@mutex.synchronize do
yield