lib/rdkafka/producer/delivery_handle.rb in rdkafka-0.8.0 vs lib/rdkafka/producer/delivery_handle.rb in rdkafka-0.8.1
- old
+ new
@@ -1,68 +1,22 @@
module Rdkafka
class Producer
# Handle to wait for a delivery report which is returned when
# producing a message.
- class DeliveryHandle < FFI::Struct
+ class DeliveryHandle < Rdkafka::AbstractHandle
layout :pending, :bool,
:response, :int,
:partition, :int,
:offset, :int64
- REGISTRY = {}
-
- CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze
-
- private_constant :CURRENT_TIME
-
- def self.register(address, handle)
- REGISTRY[address] = handle
+ # @return [String] the name of the operation (e.g. "delivery")
+ def operation_name
+ "delivery"
end
- def self.remove(address)
- REGISTRY.delete(address)
+ # @return [DeliveryReport] a report on the delivery of the message
+ def create_result
+ DeliveryReport.new(self[:partition], self[:offset])
end
-
- # Whether the delivery handle is still pending.
- #
- # @return [Boolean]
- def pending?
- self[:pending]
- end
-
- # Wait for the delivery report or raise an error if this takes longer than the timeout.
- # If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message.
- # In this case it is possible to call wait again.
- #
- # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out.
- # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if there is a delivery report available
- #
- # @raise [RdkafkaError] When delivering the message failed
- # @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
- #
- # @return [DeliveryReport]
- def wait(max_wait_timeout: 60, wait_timeout: 0.1)
- timeout = if max_wait_timeout
- CURRENT_TIME.call + max_wait_timeout
- else
- nil
- end
- loop do
- if pending?
- if timeout && timeout <= CURRENT_TIME.call
- raise WaitTimeoutError.new("Waiting for delivery timed out after #{max_wait_timeout} seconds")
- end
- sleep wait_timeout
- elsif self[:response] != 0
- raise RdkafkaError.new(self[:response])
- else
- return DeliveryReport.new(self[:partition], self[:offset])
- end
- end
- end
-
- # Error that is raised when waiting for a delivery handle to complete
- # takes longer than the specified timeout.
- class WaitTimeoutError < RuntimeError; end
end
end
end