lib/rdkafka/abstract_handle.rb in rdkafka-0.13.1 vs lib/rdkafka/abstract_handle.rb in rdkafka-0.14.0.rc1

- old
+ new

@@ -1,58 +1,71 @@ # frozen_string_literal: true -require "ffi" - module Rdkafka + # This class serves as an abstract base class to represent handles within the Rdkafka module. + # As a subclass of `FFI::Struct`, this class provides a blueprint for other specific handle + # classes to inherit from, ensuring they adhere to a particular structure and behavior. + # + # Subclasses must define their own layout, and the layout must start with: + # + # layout :pending, :bool, + # :response, :int class AbstractHandle < FFI::Struct - # Subclasses must define their own layout, and the layout must start with: - # - # layout :pending, :bool, - # :response, :int + include Helpers::Time + # Registry for registering all the handles. REGISTRY = {} - CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze + class << self + # Adds handle to the register + # + # @param handle [AbstractHandle] any handle we want to register + def register(handle) + address = handle.to_ptr.address + REGISTRY[address] = handle + end - private_constant :CURRENT_TIME - - def self.register(handle) - address = handle.to_ptr.address - REGISTRY[address] = handle + # Removes handle from the register based on the handle address + # + # @param address [Integer] address of the registered handle we want to remove + def remove(address) + REGISTRY.delete(address) + end end - def self.remove(address) - REGISTRY.delete(address) - end # Whether the handle is still pending. # # @return [Boolean] def pending? self[:pending] end # Wait for the operation to complete or raise an error if this takes longer than the timeout. - # If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. - # In this case it is possible to call wait again. + # If there is a timeout this does not mean the operation failed, rdkafka might still be working + # on the operation. In this case it is possible to call wait again. # - # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out. - # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the operation has completed + # @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. + # If this is nil it does not time out. + # @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the + # operation has completed # + # @return [Object] Operation-specific result + # # @raise [RdkafkaError] When the operation failed # @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending - # - # @return [Object] Operation-specific result def wait(max_wait_timeout: 60, wait_timeout: 0.1) timeout = if max_wait_timeout - CURRENT_TIME.call + max_wait_timeout + monotonic_now + max_wait_timeout else nil end loop do if pending? - if timeout && timeout <= CURRENT_TIME.call - raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds") + if timeout && timeout <= monotonic_now + raise WaitTimeoutError.new( + "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds" + ) end sleep wait_timeout elsif self[:response] != 0 raise_error else