lib/kafka/protocol/message_set.rb in ruby-kafka-0.1.6 vs lib/kafka/protocol/message_set.rb in ruby-kafka-0.1.7
- old
+ new
@@ -1,24 +1,29 @@
module Kafka
module Protocol
class MessageSet
attr_reader :messages
- def initialize(messages: [], compression_codec: nil)
+ def initialize(messages: [], compression_codec: nil, compression_threshold: 1)
@messages = messages
@compression_codec = compression_codec
+ @compression_threshold = compression_threshold
end
+ def size
+ @messages.size
+ end
+
def ==(other)
messages == other.messages
end
def encode(encoder)
- if @compression_codec.nil?
- encode_without_compression(encoder)
- else
+ if compress?
encode_with_compression(encoder)
+ else
+ encode_without_compression(encoder)
end
end
def self.decode(decoder)
fetched_messages = []
@@ -36,9 +41,13 @@
new(messages: fetched_messages)
end
private
+
+ def compress?
+ !@compression_codec.nil? && size >= @compression_threshold
+ end
def encode_with_compression(encoder)
codec = @compression_codec
buffer = StringIO.new