Sha256: 8b2ad572617708a450203c7c9bc85ac3b93ef0d2b9fbd699870e42a886e34b70

Contents?: true

Size: 1.58 KB

Versions: 1

Compression:

Stored size: 1.58 KB

Contents

require "optparse"
require "racecar/config_loader"

module Racecar
  class Ctl
    ProduceMessage = Struct.new(:value, :key, :topic)

    def self.main(args)
      command = args.shift or raise Racecar::Error, "no command specified"

      ctl = new

      if ctl.respond_to?(command)
        ctl.send(command, args)
      else
        raise Racecar::Error, "invalid command: #{command}"
      end
    end

    def produce(args)
      message = ProduceMessage.new

      parser = OptionParser.new do |opts|
        opts.banner = "Usage: racecarctl produce [options]"

        opts.on("-v", "--value VALUE", "Set the message value") do |value|
          message.value = value
        end

        opts.on("-k", "--key KEY", "Set the message key") do |key|
          message.key = key
        end

        opts.on("-t", "--topic TOPIC", "Set the message topic") do |topic|
          message.topic = topic
        end
      end

      parser.parse!(args)

      if message.topic.nil?
        raise Racecar::Error, "no topic specified"
      end

      if message.value.nil?
        raise Racecar::Error, "no message value specified"
      end

      ConfigLoader.load!

      Racecar.config.validate!

      kafka = Kafka.new(
        client_id: Racecar.config.client_id,
        seed_brokers: Racecar.config.brokers,
        logger: Racecar.logger,
        connect_timeout: Racecar.config.connect_timeout,
        socket_timeout: Racecar.config.socket_timeout,
      )

      kafka.deliver_message(message.value, key: message.key, topic: message.topic)

      puts "=> Delivered message to Kafka cluster"
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
racecar-0.2.1 lib/racecar/ctl.rb