lib/invoker/commander.rb in invoker-1.0.4 vs lib/invoker/commander.rb in invoker-1.1.0

- old
+ new

@@ -1,132 +1,51 @@ require "io/console" require 'pty' require "json" require "dotenv" +require "forwardable" module Invoker class Commander - MAX_PROCESS_COUNT = 10 - LABEL_COLORS = [:green, :yellow, :blue, :magenta, :cyan] - attr_accessor :reactor, :workers, :thread_group, :open_pipes - attr_accessor :event_manager, :runnables + attr_accessor :reactor, :process_manager + attr_accessor :event_manager, :runnables, :thread_group + extend Forwardable - def initialize - # mapping between open pipes and worker classes - @open_pipes = {} + def_delegators :@process_manager, :start_process_by_name, :stop_process + def_delegators :@process_manager, :restart_process, :get_worker_from_fd, :process_list - # mapping between command label and worker classes - @workers = {} + def_delegators :@event_manager, :schedule_event, :trigger + def_delegator :@reactor, :watch_for_read - @thread_group = ThreadGroup.new() - @worker_mutex = Mutex.new() + def initialize + @thread_group = ThreadGroup.new + @runnable_mutex = Mutex.new - @event_manager = Invoker::Event::Manager.new() + @event_manager = Invoker::Event::Manager.new @runnables = [] @reactor = Invoker::Reactor.new + @process_manager = Invoker::ProcessManager.new Thread.abort_on_exception = true end # Start the invoker process supervisor. This method starts a unix server # in separate thread that listens for incoming commands. def start_manager - if !Invoker::CONFIG.processes || Invoker::CONFIG.processes.empty? - raise Invoker::Errors::InvalidConfig.new("No processes configured in config file") - end - install_interrupt_handler() - unix_server_thread = Thread.new { Invoker::CommandListener::Server.new() } - thread_group.add(unix_server_thread) - run_power_server() - Invoker::CONFIG.processes.each { |process_info| add_command(process_info) } - at_exit { kill_workers } - start_event_loop() + verify_process_configuration + daemonize_app if Invoker.daemonize? + install_interrupt_handler + unix_server_thread = Thread.new { Invoker::IPC::Server.new } + @thread_group.add(unix_server_thread) + process_manager.run_power_server + Invoker.config.processes.each { |process_info| process_manager.start_process(process_info) } + at_exit { process_manager.kill_workers } + start_event_loop end - # Start given command and start a background thread to wait on the process - # - # @param process_info [OpenStruct(command, directory)] - def add_command(process_info) - m, s = PTY.open - s.raw! # disable newline conversion. - - pid = run_command(process_info, s) - - s.close() - - worker = Invoker::CommandWorker.new(process_info.label, m, pid, select_color()) - - add_worker(worker) - wait_on_pid(process_info.label,pid) - end - - # List currently running commands - def list_commands - Invoker::ProcessPrinter.to_json(workers) - end - - # Start executing given command by their label name. - # - # @param command_label [String] Command label of process specified in config file. - def add_command_by_label(command_label) - if process_running?(command_label) - Invoker::Logger.puts "\nProcess '#{command_label}' is already running".color(:red) - return false - end - - process_info = Invoker::CONFIG.process(command_label) - if process_info - add_command(process_info) - end - end - - # Reload a process given by command label - # - # @params command_label [String] Command label of process specified in config file. - def reload_command(command_label, rest_args) - if remove_command(command_label, rest_args) - event_manager.schedule_event(command_label, :worker_removed) { - add_command_by_label(command_label) - } - else - add_command_by_label(command_label) - end - end - - # Remove a process from list of processes managed by invoker supervisor.It also - # kills the process before removing it from the list. - # - # @param command_label [String] Command label of process specified in config file - # @param rest_args [Array] Additional option arguments, such as signal that can be used. - # @return [Boolean] if process existed and was removed else false - def remove_command(command_label, rest_args) - worker = workers[command_label] - return false unless worker - signal_to_use = rest_args ? Array(rest_args).first : 'INT' - - Invoker::Logger.puts("Removing #{command_label} with signal #{signal_to_use}".color(:red)) - kill_or_remove_process(worker.pid, signal_to_use, command_label) - end - - # Given a file descriptor returns the worker object - # - # @param fd [IO] an IO object with valid file descriptor - # @return [Invoker::CommandWorker] The worker object which is associated with this fd - def get_worker_from_fd(fd) - open_pipes[fd.fileno] - end - - # Given a command label returns the associated worker object - # - # @param label [String] Command label of the command - # @return [Invoker::CommandWorker] The worker object which is associated with this command - def get_worker_from_label(label) - workers[label] - end - def on_next_tick(*args, &block) - @worker_mutex.synchronize do + @runnable_mutex.synchronize do @runnables << OpenStruct.new(:args => args, :block => block) end end def run_runnables @@ -134,160 +53,39 @@ instance_exec(*runnable.args, &runnable.block) end @runnables = [] end - def run_power_server - return unless Invoker.can_run_balancer?(false) + private - powerup_id = Invoker::Power::Powerup.fork_and_start() - wait_on_pid("powerup_manager", powerup_id) - at_exit { - begin - Process.kill("INT", powerup_id) - rescue Errno::ESRCH; end - } - end - - def load_env(directory = nil) - directory ||= ENV['PWD'] - default_env = File.join(directory, '.env') - if File.exist?(default_env) - Dotenv::Environment.new(default_env) - else - {} + def verify_process_configuration + if !Invoker.config.processes || Invoker.config.processes.empty? + raise Invoker::Errors::InvalidConfig.new("No processes configured in config file") end end - private def start_event_loop loop do - reactor.watch_on_pipe() - run_runnables() - run_scheduled_events() + reactor.monitor_for_fd_events + run_runnables + run_scheduled_events end end def run_scheduled_events event_manager.run_scheduled_events do |event| - event.block.call() + event.block.call end end - def kill_or_remove_process(pid, signal_to_use, command_label) - process_kill(pid, signal_to_use) - true - rescue Errno::ESRCH - Invoker::Logger.puts("Killing process with #{pid} and name #{command_label} failed".color(:red)) - remove_worker(command_label, false) - false - end - - def process_kill(pid, signal_to_use) - if signal_to_use.to_i == 0 - Process.kill(signal_to_use, -Process.getpgid(pid)) - else - Process.kill(signal_to_use.to_i, -Process.getpgid(pid)) - end - end - - def select_color - selected_color = LABEL_COLORS.shift() - LABEL_COLORS.push(selected_color) - selected_color - end - - # Remove worker from all collections - def remove_worker(command_label, trigger_event = true) - worker = @workers[command_label] - if worker - @open_pipes.delete(worker.pipe_end.fileno) - @workers.delete(command_label) - end - if trigger_event - event_manager.trigger(command_label, :worker_removed) - end - end - - # add worker to global collections - def add_worker(worker) - @open_pipes[worker.pipe_end.fileno] = worker - @workers[worker.command_label] = worker - @reactor.add_to_monitor(worker.pipe_end) - end - - def run_command(process_info, write_pipe) - command_label = process_info.label - - event_manager.schedule_event(command_label, :exit) { remove_worker(command_label) } - - env_options = load_env(process_info.dir) - - spawn_options = { - :chdir => process_info.dir || ENV['PWD'], :out => write_pipe, :err => write_pipe, - :pgroup => true, :close_others => true, :in => :close - } - - if defined?(Bundler) - Bundler.with_clean_env do - spawn(env_options, process_info.cmd, spawn_options) - end - else - spawn(env_options, process_info.cmd, spawn_options) - end - end - - def wait_on_pid(command_label,pid) - raise Invoker::Errors::ToomanyOpenConnections if @thread_group.enclosed? - - thread = Thread.new do - Process.wait(pid) - message = "Process with command #{command_label} exited with status #{$?.exitstatus}" - Invoker::Logger.puts("\n#{message}".color(:red)) - notify_user(message) - event_manager.trigger(command_label, :exit) - end - @thread_group.add(thread) - end - - def notify_user(message) - if defined?(Bundler) - Bundler.with_clean_env do - check_and_notify_with_terminal_notifier(message) - end - else - check_and_notify_with_terminal_notifier(message) - end - end - - def check_and_notify_with_terminal_notifier(message) - return unless Invoker.darwin? - - command_path = `which terminal-notifier` - if command_path && !command_path.empty? - system("terminal-notifier -message '#{message}' -title Invoker") - end - end - def install_interrupt_handler Signal.trap("INT") do - kill_workers() + process_manager.kill_workers exit(0) end end - def kill_workers - @workers.each {|key,worker| - begin - Process.kill("INT", -Process.getpgid(worker.pid)) - rescue Errno::ESRCH - puts "Error killing #{key}" - end - } - @workers = {} - end - - def process_running?(command_label) - !!workers[command_label] + def daemonize_app + Invoker.daemon.start end end end