Sha256: 8f2c22205c7a9ea596aa5f309759f4712e045665f8b5dc47f2defbbeb24bbafd

Contents?: true

Size: 1.8 KB

Versions: 2

Compression:

Stored size: 1.8 KB

Contents

# frozen_string_literal: true

module Karafka
  # Simple admin actions that we can perform via Karafka on our Kafka cluster
  #
  # @note It always initializes a new admin instance as we want to ensure it is always closed
  #   Since admin actions are not performed that often, that should be ok.
  #
  # @note It always uses the primary defined cluster and does not support multi-cluster work.
  #   If you need this, just replace the cluster info for the time you use this
  module Admin
    class << self
      # Creates Kafka topic with given settings
      #
      # @param name [String] topic name
      # @param partitions [Integer] number of partitions we expect
      # @param replication_factor [Integer] number of replicas
      # @param topic_config [Hash] topic config details as described here:
      #   https://kafka.apache.org/documentation/#topicconfigs
      def create_topic(name, partitions, replication_factor, topic_config = {})
        with_admin do |admin|
          admin
            .create_topic(name, partitions, replication_factor, topic_config)
            .wait
        end
      end

      # Deleted a given topic
      #
      # @param name [String] topic name
      def delete_topic(name)
        with_admin do |admin|
          admin
            .delete_topic(name)
            .wait
        end
      end

      # @return [Rdkafka::Metadata] cluster metadata info
      def cluster_info
        with_admin do |admin|
          Rdkafka::Metadata.new(admin.instance_variable_get('@native_kafka'))
        end
      end

      private

      # Creates admin instance and yields it. After usage it closes the admin instance
      def with_admin
        admin = ::Rdkafka::Config.new(Karafka::App.config.kafka).admin
        result = yield(admin)
        admin.close
        result
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
karafka-2.0.3 lib/karafka/admin.rb
karafka-2.0.2 lib/karafka/admin.rb