lib/rdkafka/producer/delivery_handle.rb in rdkafka-0.6.0 vs lib/rdkafka/producer/delivery_handle.rb in rdkafka-0.7.0
- old
+ new
@@ -8,10 +8,14 @@
:partition, :int,
:offset, :int64
REGISTRY = {}
+ CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze
+
+ private_constant :CURRENT_TIME
+
def self.register(address, handle)
REGISTRY[address] = handle
end
def self.remove(address)
@@ -27,28 +31,28 @@
# Wait for the delivery report or raise an error if this takes longer than the timeout.
# If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message.
# In this case it is possible to call wait again.
#
- # @param timeout_in_seconds [Integer, nil] Number of seconds to wait before timing out. If this is nil it does not time out.
+ # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out.
+ # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if there is a delivery report available
#
# @raise [RdkafkaError] When delivering the message failed
# @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
#
# @return [DeliveryReport]
- def wait(timeout_in_seconds=60)
- timeout = if timeout_in_seconds
- Time.now.to_i + timeout_in_seconds
+ def wait(max_wait_timeout: 60, wait_timeout: 0.1)
+ timeout = if max_wait_timeout
+ CURRENT_TIME.call + max_wait_timeout
else
nil
end
loop do
if pending?
- if timeout && timeout <= Time.now.to_i
- raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds")
+ if timeout && timeout <= CURRENT_TIME.call
+ raise WaitTimeoutError.new("Waiting for delivery timed out after #{max_wait_timeout} seconds")
end
- sleep 0.1
- next
+ sleep wait_timeout
elsif self[:response] != 0
raise RdkafkaError.new(self[:response])
else
return DeliveryReport.new(self[:partition], self[:offset])
end