lib/kafka/async_producer.rb in ruby-kafka-0.4.2 vs lib/kafka/async_producer.rb in ruby-kafka-0.4.3

- old
+ new

@@ -140,22 +140,16 @@ private def ensure_threads_running! @worker_thread = nil unless @worker_thread && @worker_thread.alive? - @worker_thread ||= start_thread { @worker.run } + @worker_thread ||= Thread.new { @worker.run } @timer_thread = nil unless @timer_thread && @timer_thread.alive? - @timer_thread ||= start_thread { @timer.run } + @timer_thread ||= Thread.new { @timer.run } end - def start_thread(&block) - thread = Thread.new(&block) - thread.abort_on_exception = true - thread - end - def buffer_overflow(topic) @instrumenter.instrument("buffer_overflow.async_producer", { topic: topic, }) @@ -189,10 +183,12 @@ @instrumenter = instrumenter @logger = logger end def run + @logger.info "Starting async producer in the background..." + loop do operation, payload = @queue.pop case operation when :produce @@ -216,9 +212,18 @@ break else raise "Unknown operation #{operation.inspect}" end end + rescue Kafka::Error => e + @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.info "Restarting in 10 seconds..." + + sleep 10 + retry + rescue Exception => e + @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.error "Async producer crashed!" ensure @producer.shutdown end private