#!/usr/bin/env ruby require_relative '../config/environment' class ProduceTopic < Thor default_command :produce desc 'TOPIC FILE...', 'produce a new event at a given Apache Kafka topic' option :partition, aliases: '-p', type: :numeric, desc: 'The topic partitions to write to' option :partition_key, aliases: '-k', type: :string, desc: 'The partition key to use to select the partition' option :verbose, aliases: '-v', type: :boolean, desc: 'Enable verbose outputs' def produce(topic, *files) debug! options opts = { topic: topic, partition: options[:partition]&.to_i, partition_key: options[:partition_key] }.compact if options.key?(:partition) && options.key?(:partition_key) STDERR.puts 'Either use the fixed partition or a partition key.' STDERR.puts 'But not both together.' exit 1 end files = files.map do |file| next '/dev/stdin' if file == '-' unless File.file? file STDERR.puts "File '#{file}' does not exist." next end file end.compact.uniq if files.empty? STDERR.puts 'No files given or exists.' STDERR.puts 'You have to specify file(s) or use `-\' for stdin.' exit 1 end producer = KafkaClient.producer files.each do |file| puts "Processing lines of '#{file}' .." File.open(file, 'r') do |f| f.each_line.lazy.each do |line| puts line producer.produce(line, **opts) puts end end producer.deliver_messages end rescue Interrupt producer.deliver_messages end end ProduceTopic.start(args!)