Sha256: 8ced07322e581b763dff46b9a461fbadc35fb988ff3c5894353f1e70573c402d
Contents?: true
Size: 1.38 KB
Versions: 19
Compression:
Stored size: 1.38 KB
Contents
# frozen_string_literal: true module Sbmt module KafkaConsumer class CLI < Thor def self.exit_on_failure? true end default_command :start desc "start", "Start kafka_consumer worker" option :consumer_group_id, aliases: "-g", desc: "Consumer group id to start", repeatable: true option :concurrency, aliases: "-c", type: :numeric, default: 5, desc: "Number of threads, overrides global kafka.concurrency config" def start $stdout.puts "Initializing KafkaConsumer" $stdout.puts "Version: #{VERSION}" load_environment $stdout.sync = true $stdout.puts "Configuring client" ClientConfigurer.configure!( consumer_groups: options[:consumer_group_id], concurrency: options[:concurrency] ) $stdout.puts "Client configured routes: #{ClientConfigurer.routes.inspect}" $stdout.puts "Starting probes/metrics http-server" Sbmt::KafkaConsumer::Probes::Host.run_async Sbmt::KafkaConsumer::Server.run end private def load_environment env_file_path = ENV["KAFKAFILE"] || "#{Dir.pwd}/Kafkafile" if File.exist?(env_file_path) $stdout.puts "Loading env from Kafkafile: #{env_file_path}" load(env_file_path) end end end end end
Version data entries
19 entries across 19 versions & 1 rubygems