lib/rdkafka/producer/delivery_handle.rb in rdkafka-0.6.0 vs lib/rdkafka/producer/delivery_handle.rb in rdkafka-0.7.0

- old
+ new

@@ -8,10 +8,14 @@ :partition, :int, :offset, :int64 REGISTRY = {} + CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze + + private_constant :CURRENT_TIME + def self.register(address, handle) REGISTRY[address] = handle end def self.remove(address) @@ -27,28 +31,28 @@ # Wait for the delivery report or raise an error if this takes longer than the timeout. # If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message. # In this case it is possible to call wait again. # - # @param timeout_in_seconds [Integer, nil] Number of seconds to wait before timing out. If this is nil it does not time out. + # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out. + # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if there is a delivery report available # # @raise [RdkafkaError] When delivering the message failed # @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending # # @return [DeliveryReport] - def wait(timeout_in_seconds=60) - timeout = if timeout_in_seconds - Time.now.to_i + timeout_in_seconds + def wait(max_wait_timeout: 60, wait_timeout: 0.1) + timeout = if max_wait_timeout + CURRENT_TIME.call + max_wait_timeout else nil end loop do if pending? - if timeout && timeout <= Time.now.to_i - raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds") + if timeout && timeout <= CURRENT_TIME.call + raise WaitTimeoutError.new("Waiting for delivery timed out after #{max_wait_timeout} seconds") end - sleep 0.1 - next + sleep wait_timeout elsif self[:response] != 0 raise RdkafkaError.new(self[:response]) else return DeliveryReport.new(self[:partition], self[:offset]) end