Sha256: eb91f0ba3dd6a24362da5be65de3d3ecff9458a58f3cfea13bcc933d8164fb26

Contents?: true

Size: 1.88 KB

Versions: 1

Compression:

Stored size: 1.88 KB

Contents

module Rdkafka
  class Producer
    # Handle to wait for a delivery report which is returned when
    # producing a message.
    class DeliveryHandle < ::FFI::Struct
      layout :pending, :bool,
             :response, :int,
             :partition, :int,
             :offset, :int64

      # Whether the delivery handle is still pending.
      #
      # @return [Boolean]
      def pending?
        self[:pending]
      end

      # Wait for the delivery report or raise an error if this takes longer than the timeout.
      # If there is a timeout this does not mean the message is not delivered, rdkafka might still be working on delivering the message.
      # In this case it is possible to call wait again.
      #
      # @param timeout_in_seconds [Integer, nil] Number of seconds to wait before timing out. If this is nil it does not time out.
      #
      # @raise [RdkafkaError] When delivering the message failed
      # @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
      #
      # @return [DeliveryReport]
      def wait(timeout_in_seconds=60)
        timeout = if timeout_in_seconds
                    Time.now.to_i + timeout_in_seconds
                  else
                    nil
                  end
        loop do
          if pending?
            if timeout && timeout <= Time.now.to_i
              raise WaitTimeoutError.new("Waiting for delivery timed out after #{timeout_in_seconds} seconds")
            end
            sleep 0.1
            next
          elsif self[:response] != 0
            raise RdkafkaError.new(self[:response])
          else
            return DeliveryReport.new(self[:partition], self[:offset])
          end
        end
      end

      # Error that is raised when waiting for a delivery handle to complete
      # takes longer than the specified timeout.
      class WaitTimeoutError < RuntimeError; end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rdkafka-0.2.0 lib/rdkafka/producer/delivery_handle.rb