lib/karafka/connection/rebalance_manager.rb in karafka-2.2.7 vs lib/karafka/connection/rebalance_manager.rb in karafka-2.2.8.beta1
- old
+ new
@@ -1,10 +1,10 @@
# frozen_string_literal: true
module Karafka
module Connection
- # Manager for tracking changes in the partitions assignment.
+ # Manager for tracking changes in the partitions assignment after the assignment is done.
#
# We need tracking of those to clean up consumers that will no longer process given partitions
# as they were taken away.
#
# @note Since this does not happen really often, we try to stick with same objects for the
@@ -15,24 +15,33 @@
# expose this concept outside and we normalize to have them revoked, as it is irrelevant
# from the rest of the code perspective as only those that are lost are truly revoked.
#
# @note For cooperative-sticky `#assigned_partitions` holds only the recently assigned
# partitions, not all the partitions that are owned
+ #
+ # @note We have to have the `subscription_group` reference because we have a global pipeline
+ # for notifications and we need to make sure we track changes only for things that are of
+ # relevance to our subscription group
class RebalanceManager
# Empty array for internal usage not to create new objects
EMPTY_ARRAY = [].freeze
attr_reader :assigned_partitions, :revoked_partitions
private_constant :EMPTY_ARRAY
+ # @param subscription_group_id [String] subscription group id
# @return [RebalanceManager]
- def initialize
+ def initialize(subscription_group_id)
@assigned_partitions = {}
@revoked_partitions = {}
@changed = false
@active = false
+ @subscription_group_id = subscription_group_id
+
+ # Connects itself to the instrumentation pipeline so rebalances can be tracked
+ ::Karafka.monitor.subscribe(self)
end
# Resets the rebalance manager state
# This needs to be done before each polling loop as during the polling, the state may be
# changed
@@ -53,38 +62,44 @@
# @see https://github.com/confluentinc/librdkafka/issues/4312
def active?
@active
end
- # Callback that kicks in inside of rdkafka, when new partitions are assigned.
+ # We consider as lost only partitions that were taken away and not re-assigned back to us
+ def lost_partitions
+ lost_partitions = {}
+
+ revoked_partitions.each do |topic, partitions|
+ lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY)
+ end
+
+ lost_partitions
+ end
+
+ # Callback that kicks in inside of rdkafka, when new partitions were assigned.
#
# @private
- # @param partitions [Rdkafka::Consumer::TopicPartitionList]
- def on_partitions_assigned(partitions)
+ # @param event [Karafka::Core::Monitoring::Event]
+ def on_rebalance_partitions_assigned(event)
+ # Apply changes only for our subscription group
+ return unless event[:subscription_group_id] == @subscription_group_id
+
@active = true
- @assigned_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
+ @assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
@changed = true
end
- # Callback that kicks in inside of rdkafka, when partitions are revoked.
+ # Callback that kicks in inside of rdkafka, when partitions were revoked.
#
# @private
- # @param partitions [Rdkafka::Consumer::TopicPartitionList]
- def on_partitions_revoked(partitions)
+ # @param event [Karafka::Core::Monitoring::Event]
+ def on_rebalance_partitions_revoked(event)
+ # Apply changes only for our subscription group
+ return unless event[:subscription_group_id] == @subscription_group_id
+
@active = true
- @revoked_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
+ @revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
@changed = true
- end
-
- # We consider as lost only partitions that were taken away and not re-assigned back to us
- def lost_partitions
- lost_partitions = {}
-
- revoked_partitions.each do |topic, partitions|
- lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY)
- end
-
- lost_partitions
end
end
end
end