lib/invoker/commander.rb in invoker-1.0.4 vs lib/invoker/commander.rb in invoker-1.1.0
- old
+ new
@@ -1,132 +1,51 @@
require "io/console"
require 'pty'
require "json"
require "dotenv"
+require "forwardable"
module Invoker
class Commander
- MAX_PROCESS_COUNT = 10
- LABEL_COLORS = [:green, :yellow, :blue, :magenta, :cyan]
- attr_accessor :reactor, :workers, :thread_group, :open_pipes
- attr_accessor :event_manager, :runnables
+ attr_accessor :reactor, :process_manager
+ attr_accessor :event_manager, :runnables, :thread_group
+ extend Forwardable
- def initialize
- # mapping between open pipes and worker classes
- @open_pipes = {}
+ def_delegators :@process_manager, :start_process_by_name, :stop_process
+ def_delegators :@process_manager, :restart_process, :get_worker_from_fd, :process_list
- # mapping between command label and worker classes
- @workers = {}
+ def_delegators :@event_manager, :schedule_event, :trigger
+ def_delegator :@reactor, :watch_for_read
- @thread_group = ThreadGroup.new()
- @worker_mutex = Mutex.new()
+ def initialize
+ @thread_group = ThreadGroup.new
+ @runnable_mutex = Mutex.new
- @event_manager = Invoker::Event::Manager.new()
+ @event_manager = Invoker::Event::Manager.new
@runnables = []
@reactor = Invoker::Reactor.new
+ @process_manager = Invoker::ProcessManager.new
Thread.abort_on_exception = true
end
# Start the invoker process supervisor. This method starts a unix server
# in separate thread that listens for incoming commands.
def start_manager
- if !Invoker::CONFIG.processes || Invoker::CONFIG.processes.empty?
- raise Invoker::Errors::InvalidConfig.new("No processes configured in config file")
- end
- install_interrupt_handler()
- unix_server_thread = Thread.new { Invoker::CommandListener::Server.new() }
- thread_group.add(unix_server_thread)
- run_power_server()
- Invoker::CONFIG.processes.each { |process_info| add_command(process_info) }
- at_exit { kill_workers }
- start_event_loop()
+ verify_process_configuration
+ daemonize_app if Invoker.daemonize?
+ install_interrupt_handler
+ unix_server_thread = Thread.new { Invoker::IPC::Server.new }
+ @thread_group.add(unix_server_thread)
+ process_manager.run_power_server
+ Invoker.config.processes.each { |process_info| process_manager.start_process(process_info) }
+ at_exit { process_manager.kill_workers }
+ start_event_loop
end
- # Start given command and start a background thread to wait on the process
- #
- # @param process_info [OpenStruct(command, directory)]
- def add_command(process_info)
- m, s = PTY.open
- s.raw! # disable newline conversion.
-
- pid = run_command(process_info, s)
-
- s.close()
-
- worker = Invoker::CommandWorker.new(process_info.label, m, pid, select_color())
-
- add_worker(worker)
- wait_on_pid(process_info.label,pid)
- end
-
- # List currently running commands
- def list_commands
- Invoker::ProcessPrinter.to_json(workers)
- end
-
- # Start executing given command by their label name.
- #
- # @param command_label [String] Command label of process specified in config file.
- def add_command_by_label(command_label)
- if process_running?(command_label)
- Invoker::Logger.puts "\nProcess '#{command_label}' is already running".color(:red)
- return false
- end
-
- process_info = Invoker::CONFIG.process(command_label)
- if process_info
- add_command(process_info)
- end
- end
-
- # Reload a process given by command label
- #
- # @params command_label [String] Command label of process specified in config file.
- def reload_command(command_label, rest_args)
- if remove_command(command_label, rest_args)
- event_manager.schedule_event(command_label, :worker_removed) {
- add_command_by_label(command_label)
- }
- else
- add_command_by_label(command_label)
- end
- end
-
- # Remove a process from list of processes managed by invoker supervisor.It also
- # kills the process before removing it from the list.
- #
- # @param command_label [String] Command label of process specified in config file
- # @param rest_args [Array] Additional option arguments, such as signal that can be used.
- # @return [Boolean] if process existed and was removed else false
- def remove_command(command_label, rest_args)
- worker = workers[command_label]
- return false unless worker
- signal_to_use = rest_args ? Array(rest_args).first : 'INT'
-
- Invoker::Logger.puts("Removing #{command_label} with signal #{signal_to_use}".color(:red))
- kill_or_remove_process(worker.pid, signal_to_use, command_label)
- end
-
- # Given a file descriptor returns the worker object
- #
- # @param fd [IO] an IO object with valid file descriptor
- # @return [Invoker::CommandWorker] The worker object which is associated with this fd
- def get_worker_from_fd(fd)
- open_pipes[fd.fileno]
- end
-
- # Given a command label returns the associated worker object
- #
- # @param label [String] Command label of the command
- # @return [Invoker::CommandWorker] The worker object which is associated with this command
- def get_worker_from_label(label)
- workers[label]
- end
-
def on_next_tick(*args, &block)
- @worker_mutex.synchronize do
+ @runnable_mutex.synchronize do
@runnables << OpenStruct.new(:args => args, :block => block)
end
end
def run_runnables
@@ -134,160 +53,39 @@
instance_exec(*runnable.args, &runnable.block)
end
@runnables = []
end
- def run_power_server
- return unless Invoker.can_run_balancer?(false)
+ private
- powerup_id = Invoker::Power::Powerup.fork_and_start()
- wait_on_pid("powerup_manager", powerup_id)
- at_exit {
- begin
- Process.kill("INT", powerup_id)
- rescue Errno::ESRCH; end
- }
- end
-
- def load_env(directory = nil)
- directory ||= ENV['PWD']
- default_env = File.join(directory, '.env')
- if File.exist?(default_env)
- Dotenv::Environment.new(default_env)
- else
- {}
+ def verify_process_configuration
+ if !Invoker.config.processes || Invoker.config.processes.empty?
+ raise Invoker::Errors::InvalidConfig.new("No processes configured in config file")
end
end
- private
def start_event_loop
loop do
- reactor.watch_on_pipe()
- run_runnables()
- run_scheduled_events()
+ reactor.monitor_for_fd_events
+ run_runnables
+ run_scheduled_events
end
end
def run_scheduled_events
event_manager.run_scheduled_events do |event|
- event.block.call()
+ event.block.call
end
end
- def kill_or_remove_process(pid, signal_to_use, command_label)
- process_kill(pid, signal_to_use)
- true
- rescue Errno::ESRCH
- Invoker::Logger.puts("Killing process with #{pid} and name #{command_label} failed".color(:red))
- remove_worker(command_label, false)
- false
- end
-
- def process_kill(pid, signal_to_use)
- if signal_to_use.to_i == 0
- Process.kill(signal_to_use, -Process.getpgid(pid))
- else
- Process.kill(signal_to_use.to_i, -Process.getpgid(pid))
- end
- end
-
- def select_color
- selected_color = LABEL_COLORS.shift()
- LABEL_COLORS.push(selected_color)
- selected_color
- end
-
- # Remove worker from all collections
- def remove_worker(command_label, trigger_event = true)
- worker = @workers[command_label]
- if worker
- @open_pipes.delete(worker.pipe_end.fileno)
- @workers.delete(command_label)
- end
- if trigger_event
- event_manager.trigger(command_label, :worker_removed)
- end
- end
-
- # add worker to global collections
- def add_worker(worker)
- @open_pipes[worker.pipe_end.fileno] = worker
- @workers[worker.command_label] = worker
- @reactor.add_to_monitor(worker.pipe_end)
- end
-
- def run_command(process_info, write_pipe)
- command_label = process_info.label
-
- event_manager.schedule_event(command_label, :exit) { remove_worker(command_label) }
-
- env_options = load_env(process_info.dir)
-
- spawn_options = {
- :chdir => process_info.dir || ENV['PWD'], :out => write_pipe, :err => write_pipe,
- :pgroup => true, :close_others => true, :in => :close
- }
-
- if defined?(Bundler)
- Bundler.with_clean_env do
- spawn(env_options, process_info.cmd, spawn_options)
- end
- else
- spawn(env_options, process_info.cmd, spawn_options)
- end
- end
-
- def wait_on_pid(command_label,pid)
- raise Invoker::Errors::ToomanyOpenConnections if @thread_group.enclosed?
-
- thread = Thread.new do
- Process.wait(pid)
- message = "Process with command #{command_label} exited with status #{$?.exitstatus}"
- Invoker::Logger.puts("\n#{message}".color(:red))
- notify_user(message)
- event_manager.trigger(command_label, :exit)
- end
- @thread_group.add(thread)
- end
-
- def notify_user(message)
- if defined?(Bundler)
- Bundler.with_clean_env do
- check_and_notify_with_terminal_notifier(message)
- end
- else
- check_and_notify_with_terminal_notifier(message)
- end
- end
-
- def check_and_notify_with_terminal_notifier(message)
- return unless Invoker.darwin?
-
- command_path = `which terminal-notifier`
- if command_path && !command_path.empty?
- system("terminal-notifier -message '#{message}' -title Invoker")
- end
- end
-
def install_interrupt_handler
Signal.trap("INT") do
- kill_workers()
+ process_manager.kill_workers
exit(0)
end
end
- def kill_workers
- @workers.each {|key,worker|
- begin
- Process.kill("INT", -Process.getpgid(worker.pid))
- rescue Errno::ESRCH
- puts "Error killing #{key}"
- end
- }
- @workers = {}
- end
-
- def process_running?(command_label)
- !!workers[command_label]
+ def daemonize_app
+ Invoker.daemon.start
end
end
end