lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.6.0 vs lib/opentelemetry/sdk/trace/export/batch_span_processor.rb in opentelemetry-sdk-0.7.0

- old
+ new

@@ -54,23 +54,26 @@ @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @batch_size = max_export_batch_size @spans = [] - @thread = Thread.new { work } + @pid = nil + @thread = nil + reset_on_fork end # does nothing for this processor - def on_start(span) + def on_start(span, parent_context) # noop end # adds a span to the batcher, threadsafe may block on lock def on_finish(span) # rubocop:disable Metrics/AbcSize return unless span.context.trace_flags.sampled? lock do + reset_on_fork n = spans.size + 1 - max_queue_size spans.shift(n) if n.positive? spans << span @condition.signal if spans.size > max_queue_size / 2 end @@ -82,21 +85,31 @@ # # This method should only be called in cases where it is absolutely # necessary, such as when using some FaaS providers that may suspend # the process after an invocation, but before the `Processor` exports # the completed spans. + # + # @return [Integer] SUCCESS if no error occurred, FAILURE if a + # non-specific failure occurred, TIMEOUT if a timeout occurred. def force_flush - snapshot = lock { spans.shift(spans.size) } + snapshot = lock do + reset_on_fork(restart_thread: false) if @keep_running + spans.shift(spans.size) + end until snapshot.empty? batch = snapshot.shift(@batch_size).map!(&:to_span_data) result_code = @exporter.export(batch) report_result(result_code, batch) end + SUCCESS end # shuts the consumer thread down and flushes the current accumulated buffer # will block until the thread is finished + # + # @return [Integer] SUCCESS if no error occurred, FAILURE if a + # non-specific failure occurred, TIMEOUT if a timeout occurred. def shutdown lock do @keep_running = false @condition.signal end @@ -111,18 +124,28 @@ attr_reader :spans, :max_queue_size, :batch_size def work loop do batch = lock do + reset_on_fork(restart_thread: false) @condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running @condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running return unless @keep_running fetch_batch end export_batch(batch) end + end + + def reset_on_fork(restart_thread: true) + pid = Process.pid + return if @pid == pid + + @pid = pid + spans.clear + @thread = Thread.new { work } if restart_thread end def export_batch(batch) result_code = export_with_timeout(batch) report_result(result_code, batch)