Sha256: dc27f5e71045197042f8e440f817107a5997d06d11ed8464222edf8b8bbe023d

Contents?: true

Size: 941 Bytes

Versions: 2

Compression:

Stored size: 941 Bytes

Contents

module Kafka
  module Protocol
    class MessageSet
      attr_reader :messages

      def initialize(messages: [])
        @messages = messages
      end

      def size
        @messages.size
      end

      def ==(other)
        messages == other.messages
      end

      def encode(encoder)
        # Messages in a message set are *not* encoded as an array. Rather,
        # they are written in sequence.
        @messages.each do |message|
          message.encode(encoder)
        end
      end

      def self.decode(decoder)
        fetched_messages = []

        until decoder.eof?
          message = Message.decode(decoder)

          if message.compressed?
            wrapped_message_set = message.decompress
            fetched_messages.concat(wrapped_message_set.messages)
          else
            fetched_messages << message
          end
        end

        new(messages: fetched_messages)
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
ruby-kafka-0.3.0 lib/kafka/protocol/message_set.rb
ruby-kafka-0.2.0 lib/kafka/protocol/message_set.rb