lib/kafka/async_producer.rb in ruby-kafka-0.4.1 vs lib/kafka/async_producer.rb in ruby-kafka-0.4.2
- old
+ new
@@ -81,10 +81,12 @@
@worker = Worker.new(
queue: @queue,
producer: sync_producer,
delivery_threshold: delivery_threshold,
+ instrumenter: instrumenter,
+ logger: logger,
)
# The timer will no-op if the delivery interval is zero.
@timer = Timer.new(queue: @queue, interval: delivery_interval)
end
@@ -178,14 +180,16 @@
end
end
end
class Worker
- def initialize(queue:, producer:, delivery_threshold:)
+ def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:)
@queue = queue
@producer = producer
@delivery_threshold = delivery_threshold
+ @instrumenter = instrumenter
+ @logger = logger
end
def run
loop do
operation, payload = @queue.pop
@@ -195,11 +199,19 @@
produce(*payload)
deliver_messages if threshold_reached?
when :deliver_messages
deliver_messages
when :shutdown
- # Deliver any pending messages first.
- deliver_messages
+ begin
+ # Deliver any pending messages first.
+ @producer.deliver_messages
+ rescue Error => e
+ @logger.error("Failed to deliver messages during shutdown: #{e.message}")
+
+ @instrumenter.instrument("drop_messages.async_producer", {
+ message_count: @producer.buffer_size + @queue.size,
+ })
+ end
# Stop the run loop.
break
else
raise "Unknown operation #{operation.inspect}"