Sha256: ea6b950d897bcc6b411857d350042a60c31b17465cfcff25d883ffb18869ecc0

Contents?: true

Size: 1.91 KB

Versions: 16

Compression:

Stored size: 1.91 KB

Contents

# frozen_string_literal: true

module Karafka
  module Routing
    # rdkafka allows us to group topics subscriptions when they have same settings.
    # This builder groups topics from a single consumer group into subscription groups that can be
    # subscribed with one rdkafka connection.
    # This way we save resources as having several rdkafka consumers under the hood is not the
    # cheapest thing in a bigger system.
    #
    # In general, if we can, we try to subscribe to as many topics with one rdkafka connection as
    # possible, but if not possible, we divide.
    class SubscriptionGroupsBuilder
      # Keys used to build up a hash for subscription groups distribution.
      # In order to be able to use the same rdkafka connection for several topics, those keys need
      # to have same values.
      DISTRIBUTION_KEYS = %i[
        kafka
        max_messages
        max_wait_time
        initial_offset
      ].freeze

      private_constant :DISTRIBUTION_KEYS

      # @param topics [Karafka::Routing::Topics] all the topics based on which we want to build
      #   subscription groups
      # @return [Array<SubscriptionGroup>] all subscription groups we need in separate threads
      def call(topics)
        topics
          .map { |topic| [checksum(topic), topic] }
          .group_by(&:first)
          .values
          .map { |value| value.map(&:last) }
          .map { |topics_array| Routing::Topics.new(topics_array) }
          .map { |grouped_topics| SubscriptionGroup.new(grouped_topics) }
      end

      private

      # @param topic [Karafka::Routing::Topic] topic for which we compute the grouping checksum
      # @return [Integer] checksum that we can use to check if topics have the same set of
      #   settings based on which we group
      def checksum(topic)
        accu = {}

        DISTRIBUTION_KEYS.each { |key| accu[key] = topic.public_send(key) }

        accu.hash
      end
    end
  end
end

Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
karafka-2.0.5 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.4 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.3 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.2 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.1 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.rc6 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.rc5 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.rc4 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.rc3 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.rc2 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.rc1 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.beta5 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.beta4 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.beta3 lib/karafka/routing/subscription_groups_builder.rb
karafka-2.0.0.beta2 lib/karafka/routing/subscription_groups_builder.rb