Sha256: d9fe1405bcb3aec7a6a3b2f595f4893034d5e2c47455a00ccf3c2f7c67e3d448
Contents?: true
Size: 1.55 KB
Versions: 14
Compression:
Stored size: 1.55 KB
Contents
# frozen_string_literal: true require "kafka/protocol/member_assignment" module Kafka # A consumer group partition assignment strategy that assigns partitions to # consumers in a round-robin fashion. class RoundRobinAssignmentStrategy def initialize(cluster:) @cluster = cluster end # Assign the topic partitions to the group members. # # @param members [Array<String>] member ids # @param topics [Array<String>] topics # @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member # ids to assignments. def assign(members:, topics:) group_assignment = {} members.each do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end topic_partitions = topics.flat_map do |topic| begin partitions = @cluster.partitions_for(topic).map(&:partition_id) rescue UnknownTopicOrPartition raise UnknownTopicOrPartition, "unknown topic #{topic}" end Array.new(partitions.count) { topic }.zip(partitions) end partitions_per_member = topic_partitions.group_by.with_index do |_, index| index % members.count end.values members.zip(partitions_per_member).each do |member_id, member_partitions| unless member_partitions.nil? member_partitions.each do |topic, partition| group_assignment[member_id].assign(topic, [partition]) end end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end end end
Version data entries
14 entries across 14 versions & 2 rubygems