Sha256: 9cd516c58a6a71e484be0ca800a7cb49a3771b54ce19b78bdc172888c069ab67
Contents?: true
Size: 997 Bytes
Versions: 5
Compression:
Stored size: 997 Bytes
Contents
module Kafka # A pending message queue holds messages that have not yet been assigned to # a partition. It's designed to only remove messages once they've been # successfully handled. class PendingMessageQueue attr_reader :size, :bytesize def initialize @messages = [] @size = 0 @bytesize = 0 end def write(message) @messages << message @size += 1 @bytesize += message.bytesize end def empty? @messages.empty? end # Yields each message in the queue to the provided block, removing the # message after the block has processed it. If the block raises an # exception, the message will be retained in the queue. # # @yieldparam [PendingMessage] message # @return [nil] def dequeue_each(&block) until @messages.empty? message = @messages.first yield message @size -= 1 @bytesize -= message.bytesize @messages.shift end end end end
Version data entries
5 entries across 5 versions & 1 rubygems