Sha256: d8436a7865b30a493951496a2c3b6a9525f366bc343069d888ef45c52fd9ec65

Contents?: true

Size: 1.78 KB

Versions: 34

Compression:

Stored size: 1.78 KB

Contents

require "kafka/compression"

module Kafka

  # Compresses message sets using a specified codec.
  #
  # A message set is only compressed if its size meets the defined threshold.
  #
  # ## Instrumentation
  #
  # Whenever a message set is compressed, the notification
  # `compress.compressor.kafka` will be emitted with the following payload:
  #
  # * `message_count` – the number of messages in the message set.
  # * `uncompressed_bytesize` – the byte size of the original data.
  # * `compressed_bytesize` – the byte size of the compressed data.
  #
  class Compressor

    # @param codec_name [Symbol, nil]
    # @param threshold [Integer] the minimum number of messages in a message set
    #   that will trigger compression.
    def initialize(codec_name: nil, threshold: 1, instrumenter:)
      @codec = Compression.find_codec(codec_name)
      @threshold = threshold
      @instrumenter = instrumenter
    end

    # @param message_set [Protocol::MessageSet]
    # @return [Protocol::MessageSet]
    def compress(message_set)
      return message_set if @codec.nil? || message_set.size < @threshold

      compressed_data = compress_data(message_set)

      wrapper_message = Protocol::Message.new(
        value: compressed_data,
        codec_id: @codec.codec_id,
      )

      Protocol::MessageSet.new(messages: [wrapper_message])
    end

    private

    def compress_data(message_set)
      data = Protocol::Encoder.encode_with(message_set)

      @instrumenter.instrument("compress.compressor") do |notification|
        compressed_data = @codec.compress(data)

        notification[:message_count] = message_set.size
        notification[:uncompressed_bytesize] = data.bytesize
        notification[:compressed_bytesize] = compressed_data.bytesize

        compressed_data
      end
    end
  end
end

Version data entries

34 entries across 34 versions & 1 rubygems

Version Path
ruby-kafka-0.5.1 lib/kafka/compressor.rb
ruby-kafka-0.5.1.beta2 lib/kafka/compressor.rb
ruby-kafka-0.5.1.beta1 lib/kafka/compressor.rb
ruby-kafka-0.4.4 lib/kafka/compressor.rb
ruby-kafka-0.5.0 lib/kafka/compressor.rb
ruby-kafka-0.5.0.beta6 lib/kafka/compressor.rb
ruby-kafka-0.5.0.beta5 lib/kafka/compressor.rb
ruby-kafka-0.5.0.beta4 lib/kafka/compressor.rb
ruby-kafka-0.5.0.beta3 lib/kafka/compressor.rb
ruby-kafka-0.5.0.beta2 lib/kafka/compressor.rb
ruby-kafka-0.4.3 lib/kafka/compressor.rb
ruby-kafka-0.4.2 lib/kafka/compressor.rb
ruby-kafka-0.4.1 lib/kafka/compressor.rb
ruby-kafka-0.4.0 lib/kafka/compressor.rb
ruby-kafka-0.4.0.beta1 lib/kafka/compressor.rb
ruby-kafka-0.3.18.beta2 lib/kafka/compressor.rb
ruby-kafka-0.3.18.beta1 lib/kafka/compressor.rb
ruby-kafka-0.3.17 lib/kafka/compressor.rb
ruby-kafka-0.3.16 lib/kafka/compressor.rb
ruby-kafka-0.3.16.beta2 lib/kafka/compressor.rb