Sha256: e4c6dc822b9bcd956ee72f68d12a92fcd472a91ae808dc7179f97df7610cdc26

Contents?: true

Size: 1.7 KB

Versions: 9

Compression:

Stored size: 1.7 KB

Contents

module Kafka

  # A round robin assignment strategy inpired on the
  # original java client round robin assignor. It's capable
  # of handling identical as well as different topic subscriptions
  # accross the same consumer group.
  class RoundRobinAssignmentStrategy
    def protocol_name
      "roundrobin"
    end

    # Assign the topic partitions to the group members.
    #
    # @param cluster [Kafka::Cluster]
    # @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
    #   mapping member ids to metadata
    # @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
    #   partitions the consumer group processes
    # @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
    #   mapping member ids to partitions.
    def call(cluster:, members:, partitions:)
      partitions_per_member = Hash.new {|h, k| h[k] = [] }
      relevant_partitions = valid_sorted_partitions(members, partitions)
      members_ids = members.keys
      iterator = (0...members.size).cycle
      idx = iterator.next

      relevant_partitions.each do |partition|
        topic = partition.topic

        while !members[members_ids[idx]].topics.include?(topic)
          idx = iterator.next
        end

        partitions_per_member[members_ids[idx]] << partition
        idx = iterator.next
      end

      partitions_per_member
    end

    def valid_sorted_partitions(members, partitions)
      subscribed_topics = members.map do |id, metadata|
        metadata && metadata.topics
      end.flatten.compact

      partitions
        .select { |partition| subscribed_topics.include?(partition.topic) }
        .sort_by { |partition| partition.topic }
    end
  end
end

Version data entries

9 entries across 9 versions & 3 rubygems

Version Path
ruby-kafka-1.5.0 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-aws-iam-1.4.5 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-aws-iam-1.4.4 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-aws-iam-1.4.3 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-aws-iam-1.4.2 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-aws-iam-1.4.1 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-1.4.0 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-temp-fork-0.0.2 lib/kafka/round_robin_assignment_strategy.rb
ruby-kafka-temp-fork-0.0.1 lib/kafka/round_robin_assignment_strategy.rb