Sha256: 4558678269fcd4b1bf0fad0697f68a26310ea9590eb8da2c193e7aeef4005936

Contents?: true

Size: 1.34 KB

Versions: 9

Compression:

Stored size: 1.34 KB

Contents

# frozen_string_literal: true

require "kafka/digest"

module Kafka

  # Assigns partitions to messages.
  class Partitioner
    # @param hash_function [Symbol, nil] the algorithm used to compute a messages
    #   destination partition. Default is :crc32
    def initialize(hash_function: nil)
      @digest = Digest.find_digest(hash_function || :crc32)
    end

    # Assigns a partition number based on a partition key. If no explicit
    # partition key is provided, the message key will be used instead.
    #
    # If the key is nil, then a random partition is selected. Otherwise, a digest
    # of the key is used to deterministically find a partition. As long as the
    # number of partitions doesn't change, the same key will always be assigned
    # to the same partition.
    #
    # @param partition_count [Integer] the number of partitions in the topic.
    # @param message [Kafka::PendingMessage] the message that should be assigned
    #   a partition.
    # @return [Integer] the partition number.
    def call(partition_count, message)
      raise ArgumentError if partition_count == 0

      # If no explicit partition key is specified we use the message key instead.
      key = message.partition_key || message.key

      if key.nil?
        rand(partition_count)
      else
        @digest.hash(key) % partition_count
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 3 rubygems

Version Path
ruby-kafka-1.5.0 lib/kafka/partitioner.rb
ruby-kafka-aws-iam-1.4.5 lib/kafka/partitioner.rb
ruby-kafka-aws-iam-1.4.4 lib/kafka/partitioner.rb
ruby-kafka-aws-iam-1.4.3 lib/kafka/partitioner.rb
ruby-kafka-aws-iam-1.4.2 lib/kafka/partitioner.rb
ruby-kafka-aws-iam-1.4.1 lib/kafka/partitioner.rb
ruby-kafka-1.4.0 lib/kafka/partitioner.rb
ruby-kafka-temp-fork-0.0.2 lib/kafka/partitioner.rb
ruby-kafka-temp-fork-0.0.1 lib/kafka/partitioner.rb