lib/karafka/connection/rebalance_manager.rb in karafka-2.0.0.alpha5 vs lib/karafka/connection/rebalance_manager.rb in karafka-2.0.0.alpha6
- old
+ new
@@ -7,56 +7,71 @@
# We need tracking of those to clean up consumers that will no longer process given partitions
# as they were taken away.
#
# @note Since this does not happen really often, we try to stick with same objects for the
# empty states most of the time, so we don't create many objects during the manager life
+ #
+ # @note Internally in the rebalance manager we have a notion of lost partitions. Partitions
+ # that are lost, are those that got revoked but did not get re-assigned back. We do not
+ # expose this concept outside and we normalize to have them revoked, as it is irrelevant
+ # from the rest of the code perspective as only those that are lost are truly revoked.
class RebalanceManager
+ # Empty array for internal usage not to create new objects
+ EMPTY_ARRAY = [].freeze
+
+ private_constant :EMPTY_ARRAY
+
# @return [RebalanceManager]
def initialize
- @assigned = {}
- @revoked = {}
+ @assigned_partitions = {}
+ @revoked_partitions = {}
+ @lost_partitions = {}
end
- # @return [Hash<String, Array<Integer>>] hash where the keys are the names of topics for
- # which we've got new partitions assigned and array with ids of the partitions as the value
- # @note Once assigned partitions are fetched, the state will be reset since the callbacks
- # for new assigned partitions are set only during a state change
- def assigned_partitions
- return @assigned if @assigned.empty?
-
- result = @assigned.dup
- @assigned.clear
- result
+ # Resets the rebalance manager state
+ # This needs to be done before each polling loop as during the polling, the state may be
+ # changed
+ def clear
+ @assigned_partitions.clear
+ @revoked_partitions.clear
+ @lost_partitions.clear
end
# @return [Hash<String, Array<Integer>>] hash where the keys are the names of topics for
# which we've lost partitions and array with ids of the partitions as the value
- # @note Once revoked partitions are fetched, the state will be reset since the callbacks
- # for new revoked partitions are set only during a state change
+ # @note We do not consider as lost topics and partitions that got revoked and assigned
def revoked_partitions
- return @revoked if @revoked.empty?
+ return @revoked_partitions if @revoked_partitions.empty?
+ return @lost_partitions unless @lost_partitions.empty?
- result = @revoked.dup
- @revoked.clear
- result
+ @revoked_partitions.each do |topic, partitions|
+ @lost_partitions[topic] = partitions - @assigned_partitions.fetch(topic, EMPTY_ARRAY)
+ end
+
+ @lost_partitions
end
+ # @return [Boolean] true if any partitions were revoked
+ def revoked_partitions?
+ !revoked_partitions.empty?
+ end
+
# Callback that kicks in inside of rdkafka, when new partitions are assigned.
#
# @private
# @param _ [Rdkafka::Consumer]
# @param partitions [Rdkafka::Consumer::TopicPartitionList]
def on_partitions_assigned(_, partitions)
- @assigned = partitions.to_h.transform_values { |part| part.map(&:partition) }
+ @assigned_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
end
# Callback that kicks in inside of rdkafka, when partitions are revoked.
#
# @private
# @param _ [Rdkafka::Consumer]
# @param partitions [Rdkafka::Consumer::TopicPartitionList]
def on_partitions_revoked(_, partitions)
- @revoked = partitions.to_h.transform_values { |part| part.map(&:partition) }
+ @revoked_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
end
end
end
end