lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.12.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.12.1
- old
+ new
@@ -99,11 +99,11 @@
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
# non-specific failure occurred, TIMEOUT if a timeout occurred.
def force_flush(timeout: nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
start_time = Time.now
snapshot = lock do
- reset_on_fork(restart_thread: false) if @keep_running
+ reset_on_fork if @keep_running
spans.shift(spans.size)
end
until snapshot.empty?
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
return TIMEOUT if remaining_timeout&.zero?
@@ -134,16 +134,17 @@
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] SUCCESS if no error occurred, FAILURE if a
# non-specific failure occurred, TIMEOUT if a timeout occurred.
def shutdown(timeout: nil)
start_time = Time.now
- lock do
+ thread = lock do
@keep_running = false
@condition.signal
+ @thread
end
- @thread.join(timeout)
+ thread&.join(timeout)
force_flush(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
@exporter.shutdown(timeout: OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time))
dropped_spans = lock { spans.size }
report_dropped_spans(dropped_spans, reason: 'terminating') if dropped_spans.positive?
end
@@ -153,11 +154,10 @@
attr_reader :spans, :max_queue_size, :batch_size
def work # rubocop:disable Metrics/AbcSize
loop do
batch = lock do
- reset_on_fork(restart_thread: false)
@condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
@condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
return unless @keep_running
fetch_batch
@@ -173,10 +173,10 @@
pid = Process.pid
return if @pid == pid
@pid = pid
spans.clear
- @thread = Thread.new { work } if restart_thread
+ @thread = restart_thread ? Thread.new { work } : nil
end
def export_batch(batch, timeout: @exporter_timeout_seconds)
result_code = @export_mutex.synchronize { @exporter.export(batch, timeout: timeout) }
report_result(result_code, batch)