Sha256: 27d4344df836e0974dceb7ae956066655dc67514abb7213152435c872320539a
Contents?: true
Size: 1.18 KB
Versions: 3
Compression:
Stored size: 1.18 KB
Contents
require "kafka/protocol/member_assignment" module Kafka # A consumer group partition assignment strategy that assigns partitions to # consumers in a round-robin fashion. class RoundRobinAssignmentStrategy def initialize(cluster:) @cluster = cluster end # Assign the topic partitions to the group members. # # @param members [Array<String>] member ids # @param topics [Array<String>] topics # @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member # ids to assignments. def assign(members:, topics:) group_assignment = {} members.each do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end topics.each do |topic| partitions = @cluster.partitions_for(topic).map(&:partition_id) partitions_per_member = partitions.group_by {|partition_id| partition_id % members.count }.values members.zip(partitions_per_member).each do |member_id, member_partitions| unless member_partitions.nil? group_assignment[member_id].assign(topic, member_partitions) end end end group_assignment end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
ruby-kafka-0.3.5 | lib/kafka/round_robin_assignment_strategy.rb |
ruby-kafka-0.3.4 | lib/kafka/round_robin_assignment_strategy.rb |
ruby-kafka-0.3.3 | lib/kafka/round_robin_assignment_strategy.rb |