Sha256: f7e9594bac1bbd8d41306e4b5e0ea006be3025412b92b190ab9d568f145700f1

Contents?: true

Size: 1.79 KB

Versions: 22

Compression:

Stored size: 1.79 KB

Contents

# frozen_string_literal: true

module Karafka
  module Routing
    # Object representing a set of single consumer group topics that can be subscribed together
    # with one connection.
    #
    # @note One subscription group will always belong to one consumer group, but one consumer
    #   group can have multiple subscription groups.
    class SubscriptionGroup
      attr_reader :id, :topics

      # @param topics [Karafka::Routing::Topics] all the topics that share the same key settings
      # @return [SubscriptionGroup] built subscription group
      def initialize(topics)
        @id = SecureRandom.uuid
        @topics = topics
        freeze
      end

      # @return [String] consumer group id
      def consumer_group_id
        kafka[:'group.id']
      end

      # @return [Integer] max messages fetched in a single go
      def max_messages
        @topics.first.max_messages
      end

      # @return [Integer] max milliseconds we can wait for incoming messages
      def max_wait_time
        @topics.first.max_wait_time
      end

      # @return [Hash] kafka settings are a bit special. They are exactly the same for all of the
      #   topics but they lack the group.id (unless explicitly) provided. To make it compatible
      #   with our routing engine, we inject it before it will go to the consumer
      def kafka
        kafka = @topics.first.kafka.dup

        kafka[:'client.id'] ||= Karafka::App.config.client_id
        kafka[:'group.id'] ||= @topics.first.consumer_group.id
        kafka[:'auto.offset.reset'] ||= @topics.first.initial_offset
        # Karafka manages the offsets based on the processing state, thus we do not rely on the
        # rdkafka offset auto-storing
        kafka[:'enable.auto.offset.store'] = false
        kafka.freeze
        kafka
      end
    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
karafka-2.0.12 lib/karafka/routing/subscription_group.rb
karafka-2.0.11 lib/karafka/routing/subscription_group.rb
karafka-2.0.10 lib/karafka/routing/subscription_group.rb
karafka-2.0.9 lib/karafka/routing/subscription_group.rb
karafka-2.0.8 lib/karafka/routing/subscription_group.rb
karafka-2.0.7 lib/karafka/routing/subscription_group.rb
karafka-2.0.6 lib/karafka/routing/subscription_group.rb
karafka-2.0.5 lib/karafka/routing/subscription_group.rb
karafka-2.0.4 lib/karafka/routing/subscription_group.rb
karafka-2.0.3 lib/karafka/routing/subscription_group.rb
karafka-2.0.2 lib/karafka/routing/subscription_group.rb
karafka-2.0.1 lib/karafka/routing/subscription_group.rb
karafka-2.0.0 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.rc6 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.rc5 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.rc4 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.rc3 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.rc2 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.rc1 lib/karafka/routing/subscription_group.rb
karafka-2.0.0.beta5 lib/karafka/routing/subscription_group.rb