# frozen_string_literal: true # This Karafka component is a Pro component. # All of the commercial components are present in the lib/karafka/pro directory of this # repository and their usage requires commercial license agreement. # # Karafka has also commercial-friendly license, commercial support and commercial components. # # By sending a pull request to the pro components, you are agreeing to transfer the copyright of # your code to Maciej Mensfeld. module Karafka module Pro module Processing # Pro partitioner that can distribute work based on the virtual partitioner settings class Partitioner < ::Karafka::Processing::Partitioner # @param topic [String] topic name # @param messages [Array] karafka messages # @yieldparam [Integer] group id # @yieldparam [Array] karafka messages def call(topic, messages) ktopic = @subscription_group.topics.find(topic) @concurrency ||= ::Karafka::App.config.concurrency # We only partition work if we have a virtual partitioner and more than one thread to # process the data. With one thread it is not worth partitioning the work as the work # itself will be assigned to one thread (pointless work) if ktopic.virtual_partitioner? && @concurrency > 1 # We need to reduce it to number of threads, so the group_id is not a direct effect # of the end user action. Otherwise the persistence layer for consumers would cache # it forever and it would cause memory leaks groupings = messages .group_by { |msg| ktopic.virtual_partitioner.call(msg) } .values # Reduce the max concurrency to a size that matches the concurrency # As mentioned above we cannot use the partitioning keys directly as it could cause # memory leaks # # The algorithm here is simple, we assume that the most costly in terms of processing, # will be processing of the biggest group and we reduce the smallest once to have # max of groups equal to concurrency while groupings.size > @concurrency groupings.sort_by! { |grouping| -grouping.size } # Offset order needs to be maintained for virtual partitions groupings << (groupings.pop + groupings.pop).sort_by!(&:offset) end groupings.each_with_index { |messages_group, index| yield(index, messages_group) } else # When no virtual partitioner, works as regular one yield(0, messages) end end end end end end