lib/kafka/async_producer.rb in ruby-kafka-0.4.1 vs lib/kafka/async_producer.rb in ruby-kafka-0.4.2

- old
+ new

@@ -81,10 +81,12 @@ @worker = Worker.new( queue: @queue, producer: sync_producer, delivery_threshold: delivery_threshold, + instrumenter: instrumenter, + logger: logger, ) # The timer will no-op if the delivery interval is zero. @timer = Timer.new(queue: @queue, interval: delivery_interval) end @@ -178,14 +180,16 @@ end end end class Worker - def initialize(queue:, producer:, delivery_threshold:) + def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold + @instrumenter = instrumenter + @logger = logger end def run loop do operation, payload = @queue.pop @@ -195,11 +199,19 @@ produce(*payload) deliver_messages if threshold_reached? when :deliver_messages deliver_messages when :shutdown - # Deliver any pending messages first. - deliver_messages + begin + # Deliver any pending messages first. + @producer.deliver_messages + rescue Error => e + @logger.error("Failed to deliver messages during shutdown: #{e.message}") + + @instrumenter.instrument("drop_messages.async_producer", { + message_count: @producer.buffer_size + @queue.size, + }) + end # Stop the run loop. break else raise "Unknown operation #{operation.inspect}"