Sha256: 44b43c6d89256458d45455ec924c869484fe72ede11cf3be1e5fc42e9a7d856b

Contents?: true

Size: 1.96 KB

Versions: 12

Compression:

Stored size: 1.96 KB

Contents

require "kafka/compression"

module Kafka

  # Compresses message sets using a specified codec.
  #
  # A message set is only compressed if its size meets the defined threshold.
  #
  # ## Instrumentation
  #
  # Whenever a message set is compressed, the notification
  # `compress.compressor.kafka` will be emitted with the following payload:
  #
  # * `message_count` – the number of messages in the message set.
  # * `uncompressed_bytesize` – the byte size of the original data.
  # * `compressed_bytesize` – the byte size of the compressed data.
  #
  class Compressor

    # @param codec_name [Symbol, nil]
    # @param threshold [Integer] the minimum number of messages in a message set
    #   that will trigger compression.
    def initialize(codec_name: nil, threshold: 1, instrumenter:)
      # Codec may be nil, in which case we won't compress.
      @codec = codec_name && Compression.find_codec(codec_name)

      @threshold = threshold
      @instrumenter = instrumenter
    end

    # @param message_set [Protocol::MessageSet]
    # @param offset [Integer] used to simulate broker behaviour in tests
    # @return [Protocol::MessageSet]
    def compress(message_set, offset: -1)
      return message_set if @codec.nil? || message_set.size < @threshold

      compressed_data = compress_data(message_set)

      wrapper_message = Protocol::Message.new(
        value: compressed_data,
        codec_id: @codec.codec_id,
        offset: offset
      )

      Protocol::MessageSet.new(messages: [wrapper_message])
    end

    private

    def compress_data(message_set)
      data = Protocol::Encoder.encode_with(message_set)

      @instrumenter.instrument("compress.compressor") do |notification|
        compressed_data = @codec.compress(data)

        notification[:message_count] = message_set.size
        notification[:uncompressed_bytesize] = data.bytesize
        notification[:compressed_bytesize] = compressed_data.bytesize

        compressed_data
      end
    end
  end
end

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
ruby-kafka-0.6.0.beta4 lib/kafka/compressor.rb
ruby-kafka-0.6.0.beta3 lib/kafka/compressor.rb
ruby-kafka-0.6.0.beta2 lib/kafka/compressor.rb
ruby-kafka-0.6.0.beta1 lib/kafka/compressor.rb
ruby-kafka-0.5.5 lib/kafka/compressor.rb
ruby-kafka-0.5.4 lib/kafka/compressor.rb
ruby-kafka-0.5.4.beta1 lib/kafka/compressor.rb
ruby-kafka-0.5.3 lib/kafka/compressor.rb
ruby-kafka-0.5.2 lib/kafka/compressor.rb
ruby-kafka-0.5.2.beta3 lib/kafka/compressor.rb
ruby-kafka-0.5.2.beta2 lib/kafka/compressor.rb
ruby-kafka-0.5.2.beta1 lib/kafka/compressor.rb