lib/invoker/commander.rb in invoker-0.0.3 vs lib/invoker/commander.rb in invoker-0.1.1.pre
- old
+ new
@@ -1,117 +1,198 @@
require "io/console"
require 'pty'
+require "json"
module Invoker
class Commander
MAX_PROCESS_COUNT = 10
LABEL_COLORS = ['green', 'yellow', 'blue', 'magenta', 'cyan']
attr_accessor :reactor, :workers, :thread_group, :open_pipes
+ attr_accessor :event_manager, :runnables
def initialize
# mapping between open pipes and worker classes
@open_pipes = {}
# mapping between command label and worker classes
@workers = {}
@thread_group = ThreadGroup.new()
@worker_mutex = Mutex.new()
+
+ @event_manager = Invoker::Event::Manager.new()
+ @runnables = []
+
@reactor = Invoker::Reactor.new
Thread.abort_on_exception = true
end
+ # Start the invoker process supervisor. This method starts a unix server
+ # in separate thread that listens for incoming commands.
def start_manager
if !Invoker::CONFIG.processes || Invoker::CONFIG.processes.empty?
raise Invoker::Errors::InvalidConfig.new("No processes configured in config file")
end
install_interrupt_handler()
unix_server_thread = Thread.new { Invoker::CommandListener::Server.new() }
thread_group.add(unix_server_thread)
Invoker::CONFIG.processes.each { |process_info| add_command(process_info) }
- reactor.start
+ start_event_loop()
end
+ # Start given command and start a background thread to wait on the process
+ #
+ # @param process_info [OpenStruct(command, directory)]
def add_command(process_info)
m, s = PTY.open
s.raw! # disable newline conversion.
pid = run_command(process_info, s)
s.close()
- selected_color = LABEL_COLORS.shift()
- LABEL_COLORS.push(selected_color)
- worker = Invoker::CommandWorker.new(process_info.label, m, pid, selected_color)
+ worker = Invoker::CommandWorker.new(process_info.label, m, pid, select_color())
add_worker(worker)
wait_on_pid(process_info.label,pid)
end
+ # List currently running commands
+ def list_commands
+ Invoker::ProcessPrinter.to_json(workers)
+ end
+
+ # Start executing given command by their label name.
+ #
+ # @param command_label [String] Command label of process specified in config file.
def add_command_by_label(command_label)
process_info = Invoker::CONFIG.processes.detect {|pconfig|
pconfig.label == command_label
}
if process_info
add_command(process_info)
end
end
- def reload_command(command_label)
- remove_command(command_label)
- add_command_by_label(command_label)
+ # Reload a process given by command label
+ #
+ # @params command_label [String] Command label of process specified in config file.
+ def reload_command(command_label, rest_args)
+ if remove_command(command_label, rest_args)
+ event_manager.schedule_event(command_label, :worker_removed) {
+ add_command_by_label(command_label)
+ }
+ else
+ add_command_by_label(command_label)
+ end
end
+ # Remove a process from list of processes managed by invoker supervisor.It also
+ # kills the process before removing it from the list.
+ #
+ # @param command_label [String] Command label of process specified in config file
+ # @param rest_args [Array] Additional option arguments, such as signal that can be used.
+ # @return [Boolean] if process existed and was removed else false
def remove_command(command_label, rest_args)
worker = workers[command_label]
+ return false unless worker
signal_to_use = rest_args ? Array(rest_args).first : 'INT'
- if worker
- $stdout.puts("Removing #{command_label} with signal #{signal_to_use}".red)
- process_kill(worker.pid, signal_to_use)
- end
+ Invoker::Logger.puts("Removing #{command_label} with signal #{signal_to_use}".red)
+ kill_or_remove_process(worker.pid, signal_to_use, command_label)
end
+ # Given a file descriptor returns the worker object
+ #
+ # @param fd [IO] an IO object with valid file descriptor
+ # @return [Invoker::CommandWorker] The worker object which is associated with this fd
def get_worker_from_fd(fd)
open_pipes[fd.fileno]
end
+ # Given a command label returns the associated worker object
+ #
+ # @param label [String] Command label of the command
+ # @return [Invoker::CommandWorker] The worker object which is associated with this command
def get_worker_from_label(label)
workers[label]
end
+
+ def on_next_tick(*args, &block)
+ @worker_mutex.synchronize do
+ @runnables << OpenStruct.new(:args => args, :block => block)
+ end
+ end
+
+ def run_runnables
+ @runnables.each do |runnable|
+ instance_exec(*runnable.args, &runnable.block)
+ end
+ @runnables = []
+ end
private
+ def start_event_loop
+ loop do
+ reactor.watch_on_pipe()
+ run_runnables()
+ run_scheduled_events()
+ end
+ end
+
+ def run_scheduled_events
+ event_manager.run_scheduled_events do |event|
+ event.block.call()
+ end
+ end
+
+ def kill_or_remove_process(pid, signal_to_use, command_label)
+ process_kill(pid, signal_to_use)
+ true
+ rescue Errno::ESRCH
+ remove_worker(command_label, false)
+ false
+ end
+
def process_kill(pid, signal_to_use)
if signal_to_use.to_i == 0
Process.kill(signal_to_use, pid)
else
Process.kill(signal_to_use.to_i, pid)
end
end
+
+ def select_color
+ selected_color = LABEL_COLORS.shift()
+ LABEL_COLORS.push(selected_color)
+ selected_color
+ end
# Remove worker from all collections
- def remove_worker(command_label)
- @worker_mutex.synchronize do
- worker = @workers[command_label]
- if worker
- @open_pipes.delete(worker.pipe_end.fileno)
- @reactor.remove_from_monitoring(worker.pipe_end)
- @workers.delete(command_label)
- end
+ def remove_worker(command_label, trigger_event = true)
+ worker = @workers[command_label]
+ if worker
+ @open_pipes.delete(worker.pipe_end.fileno)
+ @workers.delete(command_label)
end
+ if trigger_event
+ event_manager.trigger(command_label, :worker_removed)
+ end
end
# add worker to global collections
def add_worker(worker)
- @worker_mutex.synchronize do
- @open_pipes[worker.pipe_end.fileno] = worker
- @workers[worker.command_label] = worker
- @reactor.add_to_monitor(worker.pipe_end)
- end
+ @open_pipes[worker.pipe_end.fileno] = worker
+ @workers[worker.command_label] = worker
+ @reactor.add_to_monitor(worker.pipe_end)
end
def run_command(process_info, write_pipe)
+ command_label = process_info.label
+
+ event_manager.schedule_event(command_label, :exit) { remove_worker(command_label) }
+
if defined?(Bundler)
Bundler.with_clean_env do
spawn(process_info.cmd,
:chdir => process_info.dir || "/", :out => write_pipe, :err => write_pipe
)
@@ -123,15 +204,16 @@
end
end
def wait_on_pid(command_label,pid)
raise Invoker::Errors::ToomanyOpenConnections if @thread_group.enclosed?
+
thread = Thread.new do
Process.wait(pid)
message = "Process with command #{command_label} exited with status #{$?.exitstatus}"
- $stdout.puts("\n#{message}".red)
+ Invoker::Logger.puts("\n#{message}".red)
notify_user(message)
- remove_worker(command_label)
+ event_manager.trigger(command_label, :exit)
end
@thread_group.add(thread)
end
def notify_user(message)