Sha256: 9cd516c58a6a71e484be0ca800a7cb49a3771b54ce19b78bdc172888c069ab67

Contents?: true

Size: 997 Bytes

Versions: 5

Compression:

Stored size: 997 Bytes

Contents

module Kafka

  # A pending message queue holds messages that have not yet been assigned to
  # a partition. It's designed to only remove messages once they've been
  # successfully handled.
  class PendingMessageQueue
    attr_reader :size, :bytesize

    def initialize
      @messages = []
      @size = 0
      @bytesize = 0
    end

    def write(message)
      @messages << message
      @size += 1
      @bytesize += message.bytesize
    end

    def empty?
      @messages.empty?
    end

    # Yields each message in the queue to the provided block, removing the
    # message after the block has processed it. If the block raises an
    # exception, the message will be retained in the queue.
    #
    # @yieldparam [PendingMessage] message
    # @return [nil]
    def dequeue_each(&block)
      until @messages.empty?
        message = @messages.first

        yield message

        @size -= 1
        @bytesize -= message.bytesize
        @messages.shift
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
ruby-kafka-0.3.2 lib/kafka/pending_message_queue.rb
ruby-kafka-0.3.1 lib/kafka/pending_message_queue.rb
ruby-kafka-0.3.0 lib/kafka/pending_message_queue.rb
ruby-kafka-0.2.0 lib/kafka/pending_message_queue.rb
ruby-kafka-0.1.7 lib/kafka/pending_message_queue.rb