lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.6.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.7.0
- old
+ new
@@ -54,23 +54,26 @@
@keep_running = true
@delay_seconds = schedule_delay_millis / 1000.0
@max_queue_size = max_queue_size
@batch_size = max_export_batch_size
@spans = []
- @thread = Thread.new { work }
+ @pid = nil
+ @thread = nil
+ reset_on_fork
end
# does nothing for this processor
- def on_start(span)
+ def on_start(span, parent_context)
# noop
end
# adds a span to the batcher, threadsafe may block on lock
def on_finish(span) # rubocop:disable Metrics/AbcSize
return unless span.context.trace_flags.sampled?
lock do
+ reset_on_fork
n = spans.size + 1 - max_queue_size
spans.shift(n) if n.positive?
spans << span
@condition.signal if spans.size > max_queue_size / 2
end
@@ -82,21 +85,31 @@
#
# This method should only be called in cases where it is absolutely
# necessary, such as when using some FaaS providers that may suspend
# the process after an invocation, but before the `Processor` exports
# the completed spans.
+ #
+ # @return [Integer] SUCCESS if no error occurred, FAILURE if a
+ # non-specific failure occurred, TIMEOUT if a timeout occurred.
def force_flush
- snapshot = lock { spans.shift(spans.size) }
+ snapshot = lock do
+ reset_on_fork(restart_thread: false) if @keep_running
+ spans.shift(spans.size)
+ end
until snapshot.empty?
batch = snapshot.shift(@batch_size).map!(&:to_span_data)
result_code = @exporter.export(batch)
report_result(result_code, batch)
end
+ SUCCESS
end
# shuts the consumer thread down and flushes the current accumulated buffer
# will block until the thread is finished
+ #
+ # @return [Integer] SUCCESS if no error occurred, FAILURE if a
+ # non-specific failure occurred, TIMEOUT if a timeout occurred.
def shutdown
lock do
@keep_running = false
@condition.signal
end
@@ -111,18 +124,28 @@
attr_reader :spans, :max_queue_size, :batch_size
def work
loop do
batch = lock do
+ reset_on_fork(restart_thread: false)
@condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
@condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
return unless @keep_running
fetch_batch
end
export_batch(batch)
end
+ end
+
+ def reset_on_fork(restart_thread: true)
+ pid = Process.pid
+ return if @pid == pid
+
+ @pid = pid
+ spans.clear
+ @thread = Thread.new { work } if restart_thread
end
def export_batch(batch)
result_code = export_with_timeout(batch)
report_result(result_code, batch)