# frozen_string_literal: true module Karafka module Routing # Object used to describe a single consumer group that is going to subscribe to # given topics # It is a part of Karafka's DSL # @note A single consumer group represents Kafka consumer group, but it may not match 1:1 with # subscription groups. There can be more subscription groups than consumer groups class ConsumerGroup attr_reader :id, :topics, :name # This is a "virtual" attribute that is not building subscription groups. # It allows us to store the "current" subscription group defined in the routing # This subscription group id is then injected into topics, so we can compute the subscription # groups attr_accessor :current_subscription_group_id # @param name [String, Symbol] raw name of this consumer group. Raw means, that it does not # yet have an application client_id namespace, this will be added here by default. # We add it to make a multi-system development easier for people that don't use # kafka and don't understand the concept of consumer groups. def initialize(name) @name = name.to_s @id = Karafka::App.config.consumer_mapper.call(name) @topics = Topics.new([]) # Initialize the subscription group so there's always a value for it, since even if not # defined directly, a subscription group will be created @current_subscription_group_id = SubscriptionGroup.id end # @return [Boolean] true if this consumer group should be active in our current process def active? cgs = Karafka::App.config.internal.routing.active.consumer_groups # When empty it means no groups were specified, hence all should be used cgs.empty? || cgs.include?(name) end # Builds a topic representation inside of a current consumer group route # @param name [String, Symbol] name of topic to which we want to subscribe # @param block [Proc] block that we want to evaluate in the topic context # @return [Karafka::Routing::Topic] newly built topic instance def topic=(name, &block) topic = Topic.new(name, self) @topics << Proxy.new(topic, &block).target built_topic = @topics.last # We overwrite it conditionally in case it was not set by the user inline in the topic # block definition built_topic.subscription_group ||= current_subscription_group_id built_topic end # Assigns the current subscription group id based on the defined one and allows for further # topic definition # @param name [String, Symbol] name of the current subscription group # @param block [Proc] block that may include topics definitions def subscription_group=(name = SubscriptionGroup.id, &block) # We cast it here, so the routing supports symbol based but that's anyhow later on # validated as a string @current_subscription_group_id = name Proxy.new(self, &block) # We need to reset the current subscription group after it is used, so it won't leak # outside to other topics that would be defined without a defined subscription group @current_subscription_group_id = SubscriptionGroup.id end # @return [Array<Routing::SubscriptionGroup>] all the subscription groups build based on # the consumer group topics def subscription_groups @subscription_groups ||= App .config .internal .routing .subscription_groups_builder .call(topics) end # Hashed version of consumer group that can be used for validation purposes # @return [Hash] hash with consumer group attributes including serialized to hash # topics inside of it. def to_h { topics: topics.map(&:to_h), id: id }.freeze end end end end