lib/zeus/server.rb in zeus-0.1.0 vs lib/zeus/server.rb in zeus-0.2.0.beta1

- old
+ new

@@ -1,298 +1,136 @@ require 'json' require 'socket' require 'rb-kqueue' + require 'zeus/process' +require 'zeus/dsl' +require 'zeus/server/file_monitor' +require 'zeus/server/client_handler' +require 'zeus/server/process_tree_monitor' +require 'zeus/server/acceptor_registration_monitor' +require 'zeus/server/acceptor' module Zeus - module Server + class Server + def self.define!(&b) - @@root = Stage.new("(root)") - @@root.instance_eval(&b) - @@files = {} + @@definition = Zeus::DSL::Evaluator.new.instance_eval(&b) end - def self.pid_has_file(pid, file) - @@files[file] ||= [] - @@files[file] << pid + def self.acceptors + @@definition.acceptors end - def self.killall_with_file(file) - pids = @@files[file] - @@process_tree.kill_nodes_with_feature(file) + attr_reader :client_handler, :acceptor_registration_monitor + def initialize + @file_monitor = FileMonitor.new(&method(:dependency_did_change)) + @acceptor_registration_monitor = AcceptorRegistrationMonitor.new + @process_tree_monitor = ProcessTreeMonitor.new + @client_handler = ClientHandler.new(acceptor_registration_monitor) + + # TODO: deprecate Zeus::Server.define! maybe. We can do that better... + @plan = @@definition.to_domain_object(self) end - TARGET_FD_LIMIT = 8192 + def dependency_did_change(file) + @process_tree_monitor.kill_nodes_with_feature(file) + end - def self.configure_number_of_file_descriptors - limit = Process.getrlimit(Process::RLIMIT_NOFILE) - if limit[0] < TARGET_FD_LIMIT && limit[1] >= TARGET_FD_LIMIT - Process.setrlimit(Process::RLIMIT_NOFILE, TARGET_FD_LIMIT) - else - puts "\x1b[33m[zeus] Warning: increase the max number of file descriptors. If you have a large project, this max cause a crash in about 10 seconds.\x1b[0m" + PID_TYPE = "P" + def w_pid line + begin + @w_msg.send(PID_TYPE + line, 0) + rescue Errno::ENOBUFS + sleep 0.2 + retry end end - def self.notify(event) - if event.flags.include?(:delete) - # file was deleted, so we need to close and reopen it. - event.watcher.disable! - begin - @@queue.watch_file(event.watcher.path, :write, :extend, :rename, :delete, &method(:notify)) - rescue Errno::ENOENT - lost_files << event.watcher.path - end + FEATURE_TYPE = "F" + def w_feature line + begin + @w_msg.send(FEATURE_TYPE + line, 0) + rescue Errno::ENOBUFS + sleep 0.2 + retry end - puts "\x1b[37m[zeus] dependency change: #{event.watcher.path}\x1b[0m" - killall_with_file(event.watcher.path) end - def self.run + def run $0 = "zeus master" - configure_number_of_file_descriptors trap("INT") { exit 0 } at_exit { Process.killall_descendants(9) } - $r_features, $w_features = IO.pipe - $w_features.sync = true + @r_msg, @w_msg = Socket.pair(:UNIX, :DGRAM) - $r_pids, $w_pids = IO.pipe - $w_pids.sync = true + # boot the actual app + @plan.run + @w_msg.close - @@process_tree = ProcessTree.new - @@root_stage_pid = @@root.run - - @@queue = KQueue::Queue.new - - lost_files = [] - - @@file_watchers = {} loop do - @@queue.poll + @file_monitor.process_events + datasources = [@r_msg, + @acceptor_registration_monitor.datasource, @client_handler.datasource] + # TODO: It would be really nice if we could put the queue poller in the select somehow. # --investigate kqueue. Is this possible? - rs, _, _ = IO.select([$r_features, $r_pids], [], [], 1) + begin + rs, _, _ = IO.select(datasources, [], [], 1) + rescue Errno::EBADF + puts "EBADF" unless defined?($asdf) + sleep 1 + $asdf = true + end rs.each do |r| case r - when $r_pids ; handle_pid_message(r.readline) - when $r_features ; handle_feature_message(r.readline) + when @acceptor_registration_monitor.datasource + @acceptor_registration_monitor.on_datasource_event + when @r_msg ; handle_messages + when @client_handler.datasource + @client_handler.on_datasource_event end end if rs end end - class ProcessTree - class Node - attr_accessor :pid, :children, :features - def initialize(pid) - @pid, @children, @features = pid, [], {} - end - - def add_child(node) - self.children << node - end - - def add_feature(feature) - self.features[feature] = true - end - - def has_feature?(feature) - self.features[feature] == true - end - - def inspect - "(#{pid}:#{features.size}:[#{children.map(&:inspect).join(",")}])" - end - - end - - def inspect - @root.inspect - end - - def initialize - @root = Node.new(Process.pid) - @nodes_by_pid = {Process.pid => @root} - end - - def node_for_pid(pid) - @nodes_by_pid[pid.to_i] ||= Node.new(pid.to_i) - end - - def process_has_parent(pid, ppid) - curr = node_for_pid(pid) - base = node_for_pid(ppid) - base.add_child(curr) - end - - def process_has_feature(pid, feature) - node = node_for_pid(pid) - node.add_feature(feature) - end - - def kill_node(node) - @nodes_by_pid.delete(node.pid) - # recall that this process explicitly traps INT -> exit 0 - Process.kill("INT", node.pid) - end - - def kill_nodes_with_feature(file, base = @root) - if base.has_feature?(file) - if base == @root.children[0] || base == @root - puts "\x1b[31mOne of zeus's dependencies changed. Not killing zeus. You may have to restart the server.\x1b[0m" - return false + def handle_messages + loop do + begin + data = @r_msg.recv_nonblock(1024) + case data[0] + when FEATURE_TYPE + handle_feature_message(data[1..-1]) + when PID_TYPE + handle_pid_message(data[1..-1]) + else + raise "Unrecognized message" end - kill_node(base) - return true - else - base.children.dup.each do |node| - if kill_nodes_with_feature(file, node) - base.children.delete(node) - end - end - return false + rescue Errno::EAGAIN + break end end - end - def self.handle_pid_message(data) + def handle_pid_message(data) data =~ /(\d+):(\d+)/ - pid, ppid = $1.to_i, $2.to_i - @@process_tree.process_has_parent(pid, ppid) + pid, ppid = $1.to_i, $2.to_i + @process_tree_monitor.process_has_parent(pid, ppid) end - def self.handle_feature_message(data) + def handle_feature_message(data) data =~ /(\d+):(.*)/ - pid, file = $1.to_i, $2 - @@process_tree.process_has_feature(pid, file) - return if @@file_watchers[file] - begin - @@file_watchers[file] = true - @@queue.watch_file(file.chomp, :write, :extend, :rename, :delete, &method(:notify)) - # rescue Errno::EMFILE - # exit 1 - rescue Errno::ENOENT - puts "No file found at #{file.chomp}" - end + pid, file = $1.to_i, $2 + @process_tree_monitor.process_has_feature(pid, file) + @file_monitor.watch(file) end - class Stage - attr_reader :pid - def initialize(name) - @name = name - @stages, @actions = [], [] - end - - def action(&b) - @actions << b - end - - def stage(name, &b) - @stages << Stage.new(name).tap { |s| s.instance_eval(&b) } - end - - def acceptor(name, socket, &b) - @stages << Acceptor.new(name, socket, &b) - end - - # There are a few things we want to accomplish: - # 1. Running all the actions (each time this stage is killed and restarted) - # 2. Starting all the substages (and restarting them when necessary) - # 3. Starting all the acceptors (and restarting them when necessary) - def run - @pid = fork { - $0 = "zeus spawner: #{@name}" - pid = Process.pid - $w_pids.puts "#{pid}:#{Process.ppid}\n" - puts "\x1b[35m[zeus] starting spawner `#{@name}`\x1b[0m" - trap("INT") { - puts "\x1b[35m[zeus] killing spawner `#{@name}`\x1b[0m" - exit 0 - } - - @actions.each(&:call) - - $LOADED_FEATURES.each do |f| - $w_features.puts "#{pid}:#{f}\n" - end - - pids = {} - @stages.each do |stage| - pids[stage.run] = stage - end - - loop do - begin - pid = Process.wait - rescue Errno::ECHILD - raise "Stage `#{@name}` has no children. All terminal nodes must be acceptors" - end - if (status = $?.exitstatus) > 0 - exit status - else # restart the stage that died. - stage = pids[pid] - pids[stage.run] = stage - end - end - - } - end - - end - - class Acceptor - attr_reader :pid - def initialize(name, socket, &b) - @name = name - @socket = socket - @action = b - end - - def run - @pid = fork { - $0 = "zeus acceptor: #{@name}" - pid = Process.pid - $w_pids.puts "#{pid}:#{Process.ppid}\n" - $LOADED_FEATURES.each do |f| - $w_features.puts "#{pid}:#{f}\n" - end - puts "\x1b[35m[zeus] starting acceptor `#{@name}`\x1b[0m" - trap("INT") { - puts "\x1b[35m[zeus] killing acceptor `#{@name}`\x1b[0m" - exit 0 - } - - File.unlink(@socket) rescue nil - server = UNIXServer.new(@socket) - loop do - ActiveRecord::Base.clear_all_connections! # TODO : refactor - client = server.accept - child = fork do - ActiveRecord::Base.establish_connection # TODO :refactor - ActiveSupport::DescendantsTracker.clear - ActiveSupport::Dependencies.clear - - terminal = client.recv_io - arguments = JSON.load(client.gets.strip) - - client << $$ << "\n" - $stdin.reopen(terminal) - $stdout.reopen(terminal) - $stderr.reopen(terminal) - ARGV.replace(arguments) - - @action.call - end - Process.detach(child) - client.close - end - } - end - + def self.pid_has_file(pid, file) + @@files[file] ||= [] + @@files[file] << pid end end end