Sha256: d8436a7865b30a493951496a2c3b6a9525f366bc343069d888ef45c52fd9ec65
Contents?: true
Size: 1.78 KB
Versions: 34
Compression:
Stored size: 1.78 KB
Contents
require "kafka/compression" module Kafka # Compresses message sets using a specified codec. # # A message set is only compressed if its size meets the defined threshold. # # ## Instrumentation # # Whenever a message set is compressed, the notification # `compress.compressor.kafka` will be emitted with the following payload: # # * `message_count` – the number of messages in the message set. # * `uncompressed_bytesize` – the byte size of the original data. # * `compressed_bytesize` – the byte size of the compressed data. # class Compressor # @param codec_name [Symbol, nil] # @param threshold [Integer] the minimum number of messages in a message set # that will trigger compression. def initialize(codec_name: nil, threshold: 1, instrumenter:) @codec = Compression.find_codec(codec_name) @threshold = threshold @instrumenter = instrumenter end # @param message_set [Protocol::MessageSet] # @return [Protocol::MessageSet] def compress(message_set) return message_set if @codec.nil? || message_set.size < @threshold compressed_data = compress_data(message_set) wrapper_message = Protocol::Message.new( value: compressed_data, codec_id: @codec.codec_id, ) Protocol::MessageSet.new(messages: [wrapper_message]) end private def compress_data(message_set) data = Protocol::Encoder.encode_with(message_set) @instrumenter.instrument("compress.compressor") do |notification| compressed_data = @codec.compress(data) notification[:message_count] = message_set.size notification[:uncompressed_bytesize] = data.bytesize notification[:compressed_bytesize] = compressed_data.bytesize compressed_data end end end end
Version data entries
34 entries across 34 versions & 1 rubygems