Sha256: d50f3aa3876c7b36bf69e0ea669c75dcd8892fbe910ee38e57ba1a531ca106a6
Contents?: true
Size: 1.35 KB
Versions: 4
Compression:
Stored size: 1.35 KB
Contents
module Kafka module Protocol class MessageSet attr_reader :messages def initialize(messages: []) @messages = messages end def size @messages.size end def ==(other) messages == other.messages end def encode(encoder) # Messages in a message set are *not* encoded as an array. Rather, # they are written in sequence. @messages.each do |message| message.encode(encoder) end end def self.decode(decoder) fetched_messages = [] until decoder.eof? begin message = Message.decode(decoder) if message.compressed? wrapped_message_set = message.decompress fetched_messages.concat(wrapped_message_set.messages) else fetched_messages << message end rescue EOFError if fetched_messages.empty? # If the first message in the set is truncated, it's likely because the # message is larger than the maximum size that we have asked for. raise MessageTooLargeToRead else # We tried to decode a partial message at the end of the set; just skip it. end end end new(messages: fetched_messages) end end end end
Version data entries
4 entries across 4 versions & 1 rubygems