Sha256: 5e267b94b853aaa1fc2e9c181b8c4d69099b0a40a6f63e83923a912197c61c36

Contents?: true

Size: 1.99 KB

Versions: 2

Compression:

Stored size: 1.99 KB

Contents

# frozen_string_literal: true

module Karafka
  # Simple admin actions that we can perform via Karafka on our Kafka cluster
  #
  # @note It always initializes a new admin instance as we want to ensure it is always closed
  #   Since admin actions are not performed that often, that should be ok.
  #
  # @note It always uses the primary defined cluster and does not support multi-cluster work.
  #   If you need this, just replace the cluster info for the time you use this
  module Admin
    class << self
      # Creates Kafka topic with given settings
      #
      # @param name [String] topic name
      # @param partitions [Integer] number of partitions we expect
      # @param replication_factor [Integer] number of replicas
      # @param topic_config [Hash] topic config details as described here:
      #   https://kafka.apache.org/documentation/#topicconfigs
      def create_topic(name, partitions, replication_factor, topic_config = {})
        with_admin do |admin|
          admin.create_topic(name, partitions, replication_factor, topic_config)

          sleep(0.1) until topics_names.include?(name)
        end
      end

      # Deleted a given topic
      #
      # @param name [String] topic name
      def delete_topic(name)
        with_admin do |admin|
          admin.delete_topic(name)

          sleep(0.1) while topics_names.include?(name)
        end
      end

      # @return [Rdkafka::Metadata] cluster metadata info
      def cluster_info
        with_admin do |admin|
          Rdkafka::Metadata.new(admin.instance_variable_get('@native_kafka'))
        end
      end

      private

      # @return [Array<String>] topics names
      def topics_names
        cluster_info.topics.map { |topic| topic.fetch(:topic_name) }
      end

      # Creates admin instance and yields it. After usage it closes the admin instance
      def with_admin
        admin = ::Rdkafka::Config.new(Karafka::App.config.kafka).admin
        result = yield(admin)
        admin.close
        result
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
karafka-2.0.5 lib/karafka/admin.rb
karafka-2.0.4 lib/karafka/admin.rb