Sha256: 6bed1989f4c5fb32c81e1e9e404c7df79c7da1ffbbfc1b2f35265f89de222194

Contents?: true

Size: 2 KB

Versions: 24

Compression:

Stored size: 2 KB

Contents

# frozen_string_literal: true

module Karafka
  module Routing
    # Object used to describe a single consumer group that is going to subscribe to
    # given topics
    # It is a part of Karafka's DSL
    class ConsumerGroup
      extend Helpers::ConfigRetriever

      attr_reader :topics
      attr_reader :id
      attr_reader :name

      # @param name [String, Symbol] raw name of this consumer group. Raw means, that it does not
      #   yet have an application client_id namespace, this will be added here by default.
      #   We add it to make a multi-system development easier for people that don't use
      #   kafka and don't understand the concept of consumer groups.
      def initialize(name)
        @name = name
        @id = Karafka::App.config.consumer_mapper.call(name)
        @topics = []
      end

      # @return [Boolean] true if this consumer group should be active in our current process
      def active?
        Karafka::Server.consumer_groups.include?(name)
      end

      # Builds a topic representation inside of a current consumer group route
      # @param name [String, Symbol] name of topic to which we want to subscribe
      # @yield Evaluates a given block in a topic context
      # @return [Karafka::Routing::Topic] newly built topic instance
      def topic=(name, &block)
        topic = Topic.new(name, self)
        @topics << Proxy.new(topic, &block).target.tap(&:build)
        @topics.last
      end

      Karafka::AttributesMap.consumer_group.each do |attribute|
        config_retriever_for(attribute)
      end

      # Hashed version of consumer group that can be used for validation purposes
      # @return [Hash] hash with consumer group attributes including serialized to hash
      # topics inside of it.
      def to_h
        result = {
          topics: topics.map(&:to_h),
          id: id
        }

        Karafka::AttributesMap.consumer_group.each do |attribute|
          result[attribute] = public_send(attribute)
        end

        result
      end
    end
  end
end

Version data entries

24 entries across 24 versions & 1 rubygems

Version Path
karafka-1.2.13 lib/karafka/routing/consumer_group.rb
karafka-1.2.12 lib/karafka/routing/consumer_group.rb
karafka-1.2.11 lib/karafka/routing/consumer_group.rb
karafka-1.2.10 lib/karafka/routing/consumer_group.rb
karafka-1.2.9 lib/karafka/routing/consumer_group.rb
karafka-1.2.8 lib/karafka/routing/consumer_group.rb
karafka-1.2.7 lib/karafka/routing/consumer_group.rb
karafka-1.2.6 lib/karafka/routing/consumer_group.rb
karafka-1.2.5 lib/karafka/routing/consumer_group.rb
karafka-1.2.4 lib/karafka/routing/consumer_group.rb
karafka-1.2.3 lib/karafka/routing/consumer_group.rb
karafka-1.2.2 lib/karafka/routing/consumer_group.rb
karafka-1.2.1 lib/karafka/routing/consumer_group.rb
karafka-1.2.0 lib/karafka/routing/consumer_group.rb
karafka-1.2.0.beta4 lib/karafka/routing/consumer_group.rb
karafka-1.2.0.beta3 lib/karafka/routing/consumer_group.rb
karafka-1.2.0.beta2 lib/karafka/routing/consumer_group.rb
karafka-1.2.0.beta1 lib/karafka/routing/consumer_group.rb
karafka-1.1.2 lib/karafka/routing/consumer_group.rb
karafka-1.1.1 lib/karafka/routing/consumer_group.rb