Sha256: 8f2c22205c7a9ea596aa5f309759f4712e045665f8b5dc47f2defbbeb24bbafd
Contents?: true
Size: 1.8 KB
Versions: 2
Compression:
Stored size: 1.8 KB
Contents
# frozen_string_literal: true module Karafka # Simple admin actions that we can perform via Karafka on our Kafka cluster # # @note It always initializes a new admin instance as we want to ensure it is always closed # Since admin actions are not performed that often, that should be ok. # # @note It always uses the primary defined cluster and does not support multi-cluster work. # If you need this, just replace the cluster info for the time you use this module Admin class << self # Creates Kafka topic with given settings # # @param name [String] topic name # @param partitions [Integer] number of partitions we expect # @param replication_factor [Integer] number of replicas # @param topic_config [Hash] topic config details as described here: # https://kafka.apache.org/documentation/#topicconfigs def create_topic(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| admin .create_topic(name, partitions, replication_factor, topic_config) .wait end end # Deleted a given topic # # @param name [String] topic name def delete_topic(name) with_admin do |admin| admin .delete_topic(name) .wait end end # @return [Rdkafka::Metadata] cluster metadata info def cluster_info with_admin do |admin| Rdkafka::Metadata.new(admin.instance_variable_get('@native_kafka')) end end private # Creates admin instance and yields it. After usage it closes the admin instance def with_admin admin = ::Rdkafka::Config.new(Karafka::App.config.kafka).admin result = yield(admin) admin.close result end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
karafka-2.0.3 | lib/karafka/admin.rb |
karafka-2.0.2 | lib/karafka/admin.rb |