lib/karafka/connection/rebalance_manager.rb in karafka-2.2.7 vs lib/karafka/connection/rebalance_manager.rb in karafka-2.2.8.beta1

- old
+ new

@@ -1,10 +1,10 @@ # frozen_string_literal: true module Karafka module Connection - # Manager for tracking changes in the partitions assignment. + # Manager for tracking changes in the partitions assignment after the assignment is done. # # We need tracking of those to clean up consumers that will no longer process given partitions # as they were taken away. # # @note Since this does not happen really often, we try to stick with same objects for the @@ -15,24 +15,33 @@ # expose this concept outside and we normalize to have them revoked, as it is irrelevant # from the rest of the code perspective as only those that are lost are truly revoked. # # @note For cooperative-sticky `#assigned_partitions` holds only the recently assigned # partitions, not all the partitions that are owned + # + # @note We have to have the `subscription_group` reference because we have a global pipeline + # for notifications and we need to make sure we track changes only for things that are of + # relevance to our subscription group class RebalanceManager # Empty array for internal usage not to create new objects EMPTY_ARRAY = [].freeze attr_reader :assigned_partitions, :revoked_partitions private_constant :EMPTY_ARRAY + # @param subscription_group_id [String] subscription group id # @return [RebalanceManager] - def initialize + def initialize(subscription_group_id) @assigned_partitions = {} @revoked_partitions = {} @changed = false @active = false + @subscription_group_id = subscription_group_id + + # Connects itself to the instrumentation pipeline so rebalances can be tracked + ::Karafka.monitor.subscribe(self) end # Resets the rebalance manager state # This needs to be done before each polling loop as during the polling, the state may be # changed @@ -53,38 +62,44 @@ # @see https://github.com/confluentinc/librdkafka/issues/4312 def active? @active end - # Callback that kicks in inside of rdkafka, when new partitions are assigned. + # We consider as lost only partitions that were taken away and not re-assigned back to us + def lost_partitions + lost_partitions = {} + + revoked_partitions.each do |topic, partitions| + lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY) + end + + lost_partitions + end + + # Callback that kicks in inside of rdkafka, when new partitions were assigned. # # @private - # @param partitions [Rdkafka::Consumer::TopicPartitionList] - def on_partitions_assigned(partitions) + # @param event [Karafka::Core::Monitoring::Event] + def on_rebalance_partitions_assigned(event) + # Apply changes only for our subscription group + return unless event[:subscription_group_id] == @subscription_group_id + @active = true - @assigned_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) } + @assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) } @changed = true end - # Callback that kicks in inside of rdkafka, when partitions are revoked. + # Callback that kicks in inside of rdkafka, when partitions were revoked. # # @private - # @param partitions [Rdkafka::Consumer::TopicPartitionList] - def on_partitions_revoked(partitions) + # @param event [Karafka::Core::Monitoring::Event] + def on_rebalance_partitions_revoked(event) + # Apply changes only for our subscription group + return unless event[:subscription_group_id] == @subscription_group_id + @active = true - @revoked_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) } + @revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) } @changed = true - end - - # We consider as lost only partitions that were taken away and not re-assigned back to us - def lost_partitions - lost_partitions = {} - - revoked_partitions.each do |topic, partitions| - lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY) - end - - lost_partitions end end end end