Sha256: e5015e5b61cd6169ae223d0db0b3b6f7fe380b62041d51d9c9e5600da2e61398

Contents?: true

Size: 782 Bytes

Versions: 8

Compression:

Stored size: 782 Bytes

Contents

require "zlib"

module Kafka

  # Assigns partitions to messages.
  class Partitioner
    def initialize(partitions)
      @partitions = partitions
    end

    # Assigns a partition number based on a key.
    #
    # If the key is nil, then a random partition is selected. Otherwise, a digest
    # of the key is used to deterministically find a partition. As long as the
    # number of partitions doesn't change, the same key will always be assigned
    # to the same partition.
    #
    # @param key [String, nil] the key to base the partition assignment on, or nil.
    # @return [Integer] the partition number.
    def partition_for_key(key)
      if key.nil?
        rand(@partitions.count)
      else
        Zlib.crc32(key) % @partitions.count
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
ruby-kafka-0.1.3 lib/kafka/partitioner.rb
ruby-kafka-0.1.2 lib/kafka/partitioner.rb
ruby-kafka-0.1.1 lib/kafka/partitioner.rb
ruby-kafka-0.1.0 lib/kafka/partitioner.rb
ruby-kafka-0.1.0.pre.beta5 lib/kafka/partitioner.rb
ruby-kafka-0.1.0.pre.beta4 lib/kafka/partitioner.rb
ruby-kafka-0.1.0.pre.beta3 lib/kafka/partitioner.rb
ruby-kafka-0.1.0.pre.beta2 lib/kafka/partitioner.rb