lib/kafka/async_producer.rb in ruby-kafka-0.4.2 vs lib/kafka/async_producer.rb in ruby-kafka-0.4.3
- old
+ new
@@ -140,22 +140,16 @@
private
def ensure_threads_running!
@worker_thread = nil unless @worker_thread && @worker_thread.alive?
- @worker_thread ||= start_thread { @worker.run }
+ @worker_thread ||= Thread.new { @worker.run }
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
- @timer_thread ||= start_thread { @timer.run }
+ @timer_thread ||= Thread.new { @timer.run }
end
- def start_thread(&block)
- thread = Thread.new(&block)
- thread.abort_on_exception = true
- thread
- end
-
def buffer_overflow(topic)
@instrumenter.instrument("buffer_overflow.async_producer", {
topic: topic,
})
@@ -189,10 +183,12 @@
@instrumenter = instrumenter
@logger = logger
end
def run
+ @logger.info "Starting async producer in the background..."
+
loop do
operation, payload = @queue.pop
case operation
when :produce
@@ -216,9 +212,18 @@
break
else
raise "Unknown operation #{operation.inspect}"
end
end
+ rescue Kafka::Error => e
+ @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
+ @logger.info "Restarting in 10 seconds..."
+
+ sleep 10
+ retry
+ rescue Exception => e
+ @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
+ @logger.error "Async producer crashed!"
ensure
@producer.shutdown
end
private