Sha256: 3f4dc311fb70fa599bf6c317d9b8431c35c9b4b5945adaaf327ea024a5a05304
Contents?: true
Size: 1.24 KB
Versions: 6
Compression:
Stored size: 1.24 KB
Contents
#!/usr/bin/env ruby require_relative '../config/environment' class CreateTopic < Thor default_command :create desc 'NAME [CONFIGS...]', 'create a new Apache Kafka topic' option :partitions, aliases: '-p', type: :numeric, default: 1, desc: 'The number of partitions' option :replicas, aliases: '-r', type: :numeric, default: 1, desc: 'The number of replications' option :verbose, aliases: '-v', type: :boolean, desc: 'Enable verbose outputs' def create(name, *configs) debug! options opts = { num_partitions: options[:partitions].to_i, replication_factor: options[:replicas].to_i, } config = configs.map { |conf| conf.split('=').map(&:strip) }.to_h if topic?(name) STDERR.puts "The topic '#{name}' already exists." puts JSON.pretty_generate(@topic_conf) exit end # Create the topic KafkaClient.create_topic(name, **opts, config: config) # Fetch the topic config puts JSON.pretty_generate(KafkaClient.describe_topic(name)) rescue Kafka::InvalidConfig STDOUT.puts "Could not create the topic '#{name}'." STDOUT.puts "The given configuration is invalid:\n\n" puts JSON.pretty_generate(config) exit 1 end end CreateTopic.start(args!)
Version data entries
6 entries across 6 versions & 1 rubygems