Sha256: 9cb7f8e95422a8176d71d7c0811f9f5044edd900126b6d4b9740a36682f1e3d6

Contents?: true

Size: 1.7 KB

Versions: 2

Compression:

Stored size: 1.7 KB

Contents

# frozen_string_literal: true

# This Karafka component is a Pro component.
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
  module Pro
    module Processing
      # Pro partitioner that can distribute work based on the virtual partitioner settings
      class Partitioner < ::Karafka::Processing::Partitioner
        # @param topic [String] topic name
        # @param messages [Array<Karafka::Messages::Message>] karafka messages
        # @yieldparam [Integer] group id
        # @yieldparam [Array<Karafka::Messages::Message>] karafka messages
        def call(topic, messages)
          ktopic = @subscription_group.topics.find(topic)

          @concurrency ||= ::Karafka::App.config.concurrency

          # We only partition work if we have a virtual partitioner and more than one thread to
          # process the data. With one thread it is not worth partitioning the work as the work
          # itself will be assigned to one thread (pointless work)
          if ktopic.virtual_partitioner? && @concurrency > 1
            messages
              .group_by { |msg| ktopic.virtual_partitioner.call(msg).hash.abs % @concurrency }
              .each { |group_id, messages_group| yield(group_id, messages_group) }
          else
            # When no virtual partitioner, works as regular one
            yield(0, messages)
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
karafka-2.0.0.rc2 lib/karafka/pro/processing/partitioner.rb
karafka-2.0.0.rc1 lib/karafka/pro/processing/partitioner.rb