lib/kafka/protocol/message_set.rb in ruby-kafka-0.1.5 vs lib/kafka/protocol/message_set.rb in ruby-kafka-0.1.6

- old
+ new

@@ -1,25 +1,67 @@ module Kafka module Protocol class MessageSet attr_reader :messages - def initialize(messages:) + def initialize(messages: [], compression_codec: nil) @messages = messages + @compression_codec = compression_codec end + def ==(other) + messages == other.messages + end + + def encode(encoder) + if @compression_codec.nil? + encode_without_compression(encoder) + else + encode_with_compression(encoder) + end + end + def self.decode(decoder) fetched_messages = [] until decoder.eof? - offset = decoder.int64 - message_decoder = Decoder.from_string(decoder.bytes) - message = Message.decode(message_decoder) + message = Message.decode(decoder) - fetched_messages << [offset, message] + if message.compressed? + wrapped_message_set = message.decompress + fetched_messages.concat(wrapped_message_set.messages) + else + fetched_messages << message + end end new(messages: fetched_messages) + end + + private + + def encode_with_compression(encoder) + codec = @compression_codec + + buffer = StringIO.new + encode_without_compression(Encoder.new(buffer)) + data = codec.compress(buffer.string) + + wrapper_message = Protocol::Message.new( + value: data, + attributes: codec.codec_id, + ) + + message_set = MessageSet.new(messages: [wrapper_message]) + message_set.encode(encoder) + end + + def encode_without_compression(encoder) + # Messages in a message set are *not* encoded as an array. Rather, + # they are written in sequence. + @messages.each do |message| + message.encode(encoder) + end end end end end