Sha256: 1574f39ff5439d040f914c808a47f85d8ae2a9f7320e304d04af515ffd143030

Contents?: true

Size: 1.63 KB

Versions: 10

Compression:

Stored size: 1.63 KB

Contents

#!/usr/bin/env ruby

require_relative '../config/environment'

class ProduceTopic < Thor
  default_command :produce

  desc 'TOPIC FILE...', 'produce a new event at a given Apache Kafka topic'
  option :partition, aliases: '-p', type: :numeric,
         desc: 'The topic partitions to write to'
  option :partition_key, aliases: '-k', type: :string,
         desc: 'The partition key to use to select the partition'
  option :verbose, aliases: '-v', type: :boolean,
         desc: 'Enable verbose outputs'

  def produce(topic, *files)
    debug! options

    opts = {
      topic: topic,
      partition: options[:partition]&.to_i,
      partition_key: options[:partition_key]
    }.compact

    if options.key?(:partition) && options.key?(:partition_key)
      STDERR.puts 'Either use the fixed partition or a partition key.'
      STDERR.puts 'But not both together.'
      exit 1
    end

    files = files.map do |file|
      next '/dev/stdin' if file == '-'

      unless File.file? file
        STDERR.puts "File '#{file}' does not exist."
        next
      end

      file
    end.compact.uniq

    if files.empty?
      STDERR.puts 'No files given or exists.'
      STDERR.puts 'You have to specify file(s) or use `-\' for stdin.'
      exit 1
    end

    producer = KafkaClient.producer

    files.each do |file|
      puts "Processing lines of '#{file}' .."
      File.open(file, 'r') do |f|
        f.each_line.lazy.each do |line|
          puts line
          producer.produce(line, **opts)
          puts
        end
      end
      producer.deliver_messages
    end
  rescue Interrupt
    producer.deliver_messages
  end
end
ProduceTopic.start(args!)

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
rimless-1.8.0 doc/kafka-playground/bin/produce-event
rimless-1.7.7 doc/kafka-playground/bin/produce-event
rimless-1.7.6 doc/kafka-playground/bin/produce-event
rimless-1.7.5 doc/kafka-playground/bin/produce-event
rimless-1.7.4 doc/kafka-playground/bin/produce-event
rimless-1.7.3 doc/kafka-playground/bin/produce-event
rimless-1.7.2 doc/kafka-playground/bin/produce-event
rimless-1.7.1 doc/kafka-playground/bin/produce-event
rimless-1.7.0 doc/kafka-playground/bin/produce-event
rimless-1.6.0 doc/kafka-playground/bin/produce-event