lib/kafka/protocol/message_set.rb in ruby-kafka-0.1.5 vs lib/kafka/protocol/message_set.rb in ruby-kafka-0.1.6
- old
+ new
@@ -1,25 +1,67 @@
module Kafka
module Protocol
class MessageSet
attr_reader :messages
- def initialize(messages:)
+ def initialize(messages: [], compression_codec: nil)
@messages = messages
+ @compression_codec = compression_codec
end
+ def ==(other)
+ messages == other.messages
+ end
+
+ def encode(encoder)
+ if @compression_codec.nil?
+ encode_without_compression(encoder)
+ else
+ encode_with_compression(encoder)
+ end
+ end
+
def self.decode(decoder)
fetched_messages = []
until decoder.eof?
- offset = decoder.int64
- message_decoder = Decoder.from_string(decoder.bytes)
- message = Message.decode(message_decoder)
+ message = Message.decode(decoder)
- fetched_messages << [offset, message]
+ if message.compressed?
+ wrapped_message_set = message.decompress
+ fetched_messages.concat(wrapped_message_set.messages)
+ else
+ fetched_messages << message
+ end
end
new(messages: fetched_messages)
+ end
+
+ private
+
+ def encode_with_compression(encoder)
+ codec = @compression_codec
+
+ buffer = StringIO.new
+ encode_without_compression(Encoder.new(buffer))
+ data = codec.compress(buffer.string)
+
+ wrapper_message = Protocol::Message.new(
+ value: data,
+ attributes: codec.codec_id,
+ )
+
+ message_set = MessageSet.new(messages: [wrapper_message])
+ message_set.encode(encoder)
+ end
+
+ def encode_without_compression(encoder)
+ # Messages in a message set are *not* encoded as an array. Rather,
+ # they are written in sequence.
+ @messages.each do |message|
+ message.encode(encoder)
+ end
end
end
end
end