Sha256: 5c325cf1356986c260a2f89d1fa27cd20f0733e376bf4d8cdcc34348ec6bcd18

Contents?: true

Size: 1.99 KB

Versions: 10

Compression:

Stored size: 1.99 KB

Contents

# frozen_string_literal: true

require "kafka/compression"

module Kafka

  # Compresses message sets using a specified codec.
  #
  # A message set is only compressed if its size meets the defined threshold.
  #
  # ## Instrumentation
  #
  # Whenever a message set is compressed, the notification
  # `compress.compressor.kafka` will be emitted with the following payload:
  #
  # * `message_count` – the number of messages in the message set.
  # * `uncompressed_bytesize` – the byte size of the original data.
  # * `compressed_bytesize` – the byte size of the compressed data.
  #
  class Compressor

    # @param codec_name [Symbol, nil]
    # @param threshold [Integer] the minimum number of messages in a message set
    #   that will trigger compression.
    def initialize(codec_name: nil, threshold: 1, instrumenter:)
      # Codec may be nil, in which case we won't compress.
      @codec = codec_name && Compression.find_codec(codec_name)

      @threshold = threshold
      @instrumenter = instrumenter
    end

    # @param message_set [Protocol::MessageSet]
    # @param offset [Integer] used to simulate broker behaviour in tests
    # @return [Protocol::MessageSet]
    def compress(message_set, offset: -1)
      return message_set if @codec.nil? || message_set.size < @threshold

      compressed_data = compress_data(message_set)

      wrapper_message = Protocol::Message.new(
        value: compressed_data,
        codec_id: @codec.codec_id,
        offset: offset
      )

      Protocol::MessageSet.new(messages: [wrapper_message])
    end

    private

    def compress_data(message_set)
      data = Protocol::Encoder.encode_with(message_set)

      @instrumenter.instrument("compress.compressor") do |notification|
        compressed_data = @codec.compress(data)

        notification[:message_count] = message_set.size
        notification[:uncompressed_bytesize] = data.bytesize
        notification[:compressed_bytesize] = compressed_data.bytesize

        compressed_data
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
ruby-kafka-0.6.8 lib/kafka/compressor.rb
ruby-kafka-0.6.7 lib/kafka/compressor.rb
ruby-kafka-0.6.6 lib/kafka/compressor.rb
ruby-kafka-0.6.5 lib/kafka/compressor.rb
ruby-kafka-0.6.4 lib/kafka/compressor.rb
ruby-kafka-0.7.0.alpha1 lib/kafka/compressor.rb
ruby-kafka-0.6.3 lib/kafka/compressor.rb
ruby-kafka-0.6.2 lib/kafka/compressor.rb
ruby-kafka-0.6.1 lib/kafka/compressor.rb
ruby-kafka-0.6.0 lib/kafka/compressor.rb