lib/kafka/async_producer.rb in ruby-kafka-1.3.0 vs lib/kafka/async_producer.rb in ruby-kafka-1.4.0

- old
+ new

@@ -210,51 +210,57 @@ def run @logger.push_tags(@producer.to_s) @logger.info "Starting async producer in the background..." + do_loop + rescue Exception => e + @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.error "Async producer crashed!" + ensure + @producer.shutdown + @logger.pop_tags + end + + private + + def do_loop loop do - operation, payload = @queue.pop + begin + operation, payload = @queue.pop - case operation - when :produce - produce(payload[0], **payload[1]) - deliver_messages if threshold_reached? - when :deliver_messages - deliver_messages - when :shutdown - begin - # Deliver any pending messages first. - @producer.deliver_messages - rescue Error => e - @logger.error("Failed to deliver messages during shutdown: #{e.message}") + case operation + when :produce + produce(payload[0], **payload[1]) + deliver_messages if threshold_reached? + when :deliver_messages + deliver_messages + when :shutdown + begin + # Deliver any pending messages first. + @producer.deliver_messages + rescue Error => e + @logger.error("Failed to deliver messages during shutdown: #{e.message}") - @instrumenter.instrument("drop_messages.async_producer", { - message_count: @producer.buffer_size + @queue.size, - }) - end + @instrumenter.instrument("drop_messages.async_producer", { + message_count: @producer.buffer_size + @queue.size, + }) + end - # Stop the run loop. - break - else - raise "Unknown operation #{operation.inspect}" + # Stop the run loop. + break + else + raise "Unknown operation #{operation.inspect}" + end end end rescue Kafka::Error => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.info "Restarting in 10 seconds..." sleep 10 retry - rescue Exception => e - @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - @logger.error "Async producer crashed!" - ensure - @producer.shutdown - @logger.pop_tags end - - private def produce(value, **kwargs) retries = 0 begin @producer.produce(value, **kwargs)