#!/usr/bin/env ruby require "kafkr" require "find" require "digest" # Accepting command line arguments for host and port host = ARGV[0] || "localhost" port = ARGV[1] || 4000 timeout = ARGV[2] || 300 Kafkr.log "Running on host: #{host} and port: #{port}" $current_consumer = nil $restart_required = false $handlers_loaded = false $handlers_changed = false $loaded_handlers = {} Signal.trap("USR1") do $restart_required = true end def stop_consumer $current_consumer = nil if $current_consumer end def list_registered_handlers Kafkr::Consumer.handlers.each do |handler| $loaded_handlers = {} handler_name = handler.class.name.split("::").last.gsub(/Handler$/, "") Kafkr.log "#{handler_name} handler registered." end end def start_consumer(port, host, timeout) Kafkr.log "Starting consumer on port #{port}! timeout: #{timeout}" $handlers_changed = false Kafkr::Consumer.configure do |config| config.port = port config.host = host config.timeout = timeout end unless $handlers_loaded Kafkr::Consumer.load_handlers if $handlers_changed == false list_registered_handlers $handlers_loaded = true end $current_consumer = Kafkr::Consumer.new $current_consumer.listen do |message| Kafkr.log "Message consumed: #{message}"# Processing of the message end end def reload_handlers(file_checksums) handlers_before_reload = Kafkr::Consumer.handlers.dup # Store current handlers Find.find(Kafkr::Consumer::HANDLERS_DIRECTORY) do |path| next unless File.file?(path) load path end if $handlers_changed Kafkr::Consumer.load_handlers new_handlers = Kafkr::Consumer.handlers - handlers_before_reload if new_handlers.any? new_handlers.each do |handler| handler_name = handler.class.name.split("::").last.gsub(/Handler$/, "").capitalize Kafkr.log "#{handler_name} handler updated - ok!" end end end end def monitor_handlers(file_checksums) loop do changed = false Find.find(Kafkr::Consumer::HANDLERS_DIRECTORY) do |path| next unless File.file?(path) current_checksum = Digest::MD5.file(path).hexdigest if file_checksums[path] != current_checksum file_checksums[path] = current_checksum changed = true end end $handlers_changed = changed # Set outside the loop reload_handlers(file_checksums) if $handlers_changed sleep 5 end end file_checksums = {} monitoring_thread = Thread.new { monitor_handlers(file_checksums) } start_consumer(port, host,timeout) # Pass the port here begin loop do if $restart_required stop_consumer start_consumer(port, host,timeout) $restart_required = false end sleep 1 end rescue LoadError => e exit(1) rescue => e exit(1) rescue Interrupt stop_consumer exit(0) ensure monitoring_thread.kill if monitoring_thread end