lib/kafka/async_producer.rb in ruby-kafka-1.3.0 vs lib/kafka/async_producer.rb in ruby-kafka-1.4.0
- old
+ new
@@ -210,51 +210,57 @@
def run
@logger.push_tags(@producer.to_s)
@logger.info "Starting async producer in the background..."
+ do_loop
+ rescue Exception => e
+ @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
+ @logger.error "Async producer crashed!"
+ ensure
+ @producer.shutdown
+ @logger.pop_tags
+ end
+
+ private
+
+ def do_loop
loop do
- operation, payload = @queue.pop
+ begin
+ operation, payload = @queue.pop
- case operation
- when :produce
- produce(payload[0], **payload[1])
- deliver_messages if threshold_reached?
- when :deliver_messages
- deliver_messages
- when :shutdown
- begin
- # Deliver any pending messages first.
- @producer.deliver_messages
- rescue Error => e
- @logger.error("Failed to deliver messages during shutdown: #{e.message}")
+ case operation
+ when :produce
+ produce(payload[0], **payload[1])
+ deliver_messages if threshold_reached?
+ when :deliver_messages
+ deliver_messages
+ when :shutdown
+ begin
+ # Deliver any pending messages first.
+ @producer.deliver_messages
+ rescue Error => e
+ @logger.error("Failed to deliver messages during shutdown: #{e.message}")
- @instrumenter.instrument("drop_messages.async_producer", {
- message_count: @producer.buffer_size + @queue.size,
- })
- end
+ @instrumenter.instrument("drop_messages.async_producer", {
+ message_count: @producer.buffer_size + @queue.size,
+ })
+ end
- # Stop the run loop.
- break
- else
- raise "Unknown operation #{operation.inspect}"
+ # Stop the run loop.
+ break
+ else
+ raise "Unknown operation #{operation.inspect}"
+ end
end
end
rescue Kafka::Error => e
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
@logger.info "Restarting in 10 seconds..."
sleep 10
retry
- rescue Exception => e
- @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
- @logger.error "Async producer crashed!"
- ensure
- @producer.shutdown
- @logger.pop_tags
end
-
- private
def produce(value, **kwargs)
retries = 0
begin
@producer.produce(value, **kwargs)