lib/rdkafka/abstract_handle.rb in rdkafka-0.13.1 vs lib/rdkafka/abstract_handle.rb in rdkafka-0.14.0.rc1
- old
+ new
@@ -1,58 +1,71 @@
# frozen_string_literal: true
-require "ffi"
-
module Rdkafka
+ # This class serves as an abstract base class to represent handles within the Rdkafka module.
+ # As a subclass of `FFI::Struct`, this class provides a blueprint for other specific handle
+ # classes to inherit from, ensuring they adhere to a particular structure and behavior.
+ #
+ # Subclasses must define their own layout, and the layout must start with:
+ #
+ # layout :pending, :bool,
+ # :response, :int
class AbstractHandle < FFI::Struct
- # Subclasses must define their own layout, and the layout must start with:
- #
- # layout :pending, :bool,
- # :response, :int
+ include Helpers::Time
+ # Registry for registering all the handles.
REGISTRY = {}
- CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze
+ class << self
+ # Adds handle to the register
+ #
+ # @param handle [AbstractHandle] any handle we want to register
+ def register(handle)
+ address = handle.to_ptr.address
+ REGISTRY[address] = handle
+ end
- private_constant :CURRENT_TIME
-
- def self.register(handle)
- address = handle.to_ptr.address
- REGISTRY[address] = handle
+ # Removes handle from the register based on the handle address
+ #
+ # @param address [Integer] address of the registered handle we want to remove
+ def remove(address)
+ REGISTRY.delete(address)
+ end
end
- def self.remove(address)
- REGISTRY.delete(address)
- end
# Whether the handle is still pending.
#
# @return [Boolean]
def pending?
self[:pending]
end
# Wait for the operation to complete or raise an error if this takes longer than the timeout.
- # If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation.
- # In this case it is possible to call wait again.
+ # If there is a timeout this does not mean the operation failed, rdkafka might still be working
+ # on the operation. In this case it is possible to call wait again.
#
- # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out.
- # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the operation has completed
+ # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out.
+ # If this is nil it does not time out.
+ # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the
+ # operation has completed
#
+ # @return [Object] Operation-specific result
+ #
# @raise [RdkafkaError] When the operation failed
# @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
- #
- # @return [Object] Operation-specific result
def wait(max_wait_timeout: 60, wait_timeout: 0.1)
timeout = if max_wait_timeout
- CURRENT_TIME.call + max_wait_timeout
+ monotonic_now + max_wait_timeout
else
nil
end
loop do
if pending?
- if timeout && timeout <= CURRENT_TIME.call
- raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds")
+ if timeout && timeout <= monotonic_now
+ raise WaitTimeoutError.new(
+ "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds"
+ )
end
sleep wait_timeout
elsif self[:response] != 0
raise_error
else