Sha256: 3f4dc311fb70fa599bf6c317d9b8431c35c9b4b5945adaaf327ea024a5a05304

Contents?: true

Size: 1.24 KB

Versions: 6

Compression:

Stored size: 1.24 KB

Contents

#!/usr/bin/env ruby

require_relative '../config/environment'

class CreateTopic < Thor
  default_command :create

  desc 'NAME [CONFIGS...]', 'create a new Apache Kafka topic'
  option :partitions, aliases: '-p', type: :numeric, default: 1,
         desc: 'The number of partitions'
  option :replicas, aliases: '-r', type: :numeric, default: 1,
         desc: 'The number of replications'
  option :verbose, aliases: '-v', type: :boolean,
         desc: 'Enable verbose outputs'
  def create(name, *configs)
    debug! options

    opts = {
      num_partitions: options[:partitions].to_i,
      replication_factor: options[:replicas].to_i,
    }
    config = configs.map { |conf| conf.split('=').map(&:strip) }.to_h

    if topic?(name)
      STDERR.puts "The topic '#{name}' already exists."
      puts JSON.pretty_generate(@topic_conf)
      exit
    end

    # Create the topic
    KafkaClient.create_topic(name, **opts, config: config)

    # Fetch the topic config
    puts JSON.pretty_generate(KafkaClient.describe_topic(name))
  rescue Kafka::InvalidConfig
    STDOUT.puts "Could not create the topic '#{name}'."
    STDOUT.puts "The given configuration is invalid:\n\n"
    puts JSON.pretty_generate(config)
    exit 1
  end
end
CreateTopic.start(args!)

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
rimless-1.5.1 doc/kafka-playground/bin/create-topic
rimless-1.5.0 doc/kafka-playground/bin/create-topic
rimless-1.4.2 doc/kafka-playground/bin/create-topic
rimless-1.4.1 doc/kafka-playground/bin/create-topic
rimless-1.4.0 doc/kafka-playground/bin/create-topic
rimless-1.3.0 doc/kafka-playground/bin/create-topic