lib/zeus/server.rb in zeus-0.1.0 vs lib/zeus/server.rb in zeus-0.2.0.beta1
- old
+ new
@@ -1,298 +1,136 @@
require 'json'
require 'socket'
require 'rb-kqueue'
+
require 'zeus/process'
+require 'zeus/dsl'
+require 'zeus/server/file_monitor'
+require 'zeus/server/client_handler'
+require 'zeus/server/process_tree_monitor'
+require 'zeus/server/acceptor_registration_monitor'
+require 'zeus/server/acceptor'
module Zeus
- module Server
+ class Server
+
def self.define!(&b)
- @@root = Stage.new("(root)")
- @@root.instance_eval(&b)
- @@files = {}
+ @@definition = Zeus::DSL::Evaluator.new.instance_eval(&b)
end
- def self.pid_has_file(pid, file)
- @@files[file] ||= []
- @@files[file] << pid
+ def self.acceptors
+ @@definition.acceptors
end
- def self.killall_with_file(file)
- pids = @@files[file]
- @@process_tree.kill_nodes_with_feature(file)
+ attr_reader :client_handler, :acceptor_registration_monitor
+ def initialize
+ @file_monitor = FileMonitor.new(&method(:dependency_did_change))
+ @acceptor_registration_monitor = AcceptorRegistrationMonitor.new
+ @process_tree_monitor = ProcessTreeMonitor.new
+ @client_handler = ClientHandler.new(acceptor_registration_monitor)
+
+ # TODO: deprecate Zeus::Server.define! maybe. We can do that better...
+ @plan = @@definition.to_domain_object(self)
end
- TARGET_FD_LIMIT = 8192
+ def dependency_did_change(file)
+ @process_tree_monitor.kill_nodes_with_feature(file)
+ end
- def self.configure_number_of_file_descriptors
- limit = Process.getrlimit(Process::RLIMIT_NOFILE)
- if limit[0] < TARGET_FD_LIMIT && limit[1] >= TARGET_FD_LIMIT
- Process.setrlimit(Process::RLIMIT_NOFILE, TARGET_FD_LIMIT)
- else
- puts "\x1b[33m[zeus] Warning: increase the max number of file descriptors. If you have a large project, this max cause a crash in about 10 seconds.\x1b[0m"
+ PID_TYPE = "P"
+ def w_pid line
+ begin
+ @w_msg.send(PID_TYPE + line, 0)
+ rescue Errno::ENOBUFS
+ sleep 0.2
+ retry
end
end
- def self.notify(event)
- if event.flags.include?(:delete)
- # file was deleted, so we need to close and reopen it.
- event.watcher.disable!
- begin
- @@queue.watch_file(event.watcher.path, :write, :extend, :rename, :delete, &method(:notify))
- rescue Errno::ENOENT
- lost_files << event.watcher.path
- end
+ FEATURE_TYPE = "F"
+ def w_feature line
+ begin
+ @w_msg.send(FEATURE_TYPE + line, 0)
+ rescue Errno::ENOBUFS
+ sleep 0.2
+ retry
end
- puts "\x1b[37m[zeus] dependency change: #{event.watcher.path}\x1b[0m"
- killall_with_file(event.watcher.path)
end
- def self.run
+ def run
$0 = "zeus master"
- configure_number_of_file_descriptors
trap("INT") { exit 0 }
at_exit { Process.killall_descendants(9) }
- $r_features, $w_features = IO.pipe
- $w_features.sync = true
+ @r_msg, @w_msg = Socket.pair(:UNIX, :DGRAM)
- $r_pids, $w_pids = IO.pipe
- $w_pids.sync = true
+ # boot the actual app
+ @plan.run
+ @w_msg.close
- @@process_tree = ProcessTree.new
- @@root_stage_pid = @@root.run
-
- @@queue = KQueue::Queue.new
-
- lost_files = []
-
- @@file_watchers = {}
loop do
- @@queue.poll
+ @file_monitor.process_events
+ datasources = [@r_msg,
+ @acceptor_registration_monitor.datasource, @client_handler.datasource]
+
# TODO: It would be really nice if we could put the queue poller in the select somehow.
# --investigate kqueue. Is this possible?
- rs, _, _ = IO.select([$r_features, $r_pids], [], [], 1)
+ begin
+ rs, _, _ = IO.select(datasources, [], [], 1)
+ rescue Errno::EBADF
+ puts "EBADF" unless defined?($asdf)
+ sleep 1
+ $asdf = true
+ end
rs.each do |r|
case r
- when $r_pids ; handle_pid_message(r.readline)
- when $r_features ; handle_feature_message(r.readline)
+ when @acceptor_registration_monitor.datasource
+ @acceptor_registration_monitor.on_datasource_event
+ when @r_msg ; handle_messages
+ when @client_handler.datasource
+ @client_handler.on_datasource_event
end
end if rs
end
end
- class ProcessTree
- class Node
- attr_accessor :pid, :children, :features
- def initialize(pid)
- @pid, @children, @features = pid, [], {}
- end
-
- def add_child(node)
- self.children << node
- end
-
- def add_feature(feature)
- self.features[feature] = true
- end
-
- def has_feature?(feature)
- self.features[feature] == true
- end
-
- def inspect
- "(#{pid}:#{features.size}:[#{children.map(&:inspect).join(",")}])"
- end
-
- end
-
- def inspect
- @root.inspect
- end
-
- def initialize
- @root = Node.new(Process.pid)
- @nodes_by_pid = {Process.pid => @root}
- end
-
- def node_for_pid(pid)
- @nodes_by_pid[pid.to_i] ||= Node.new(pid.to_i)
- end
-
- def process_has_parent(pid, ppid)
- curr = node_for_pid(pid)
- base = node_for_pid(ppid)
- base.add_child(curr)
- end
-
- def process_has_feature(pid, feature)
- node = node_for_pid(pid)
- node.add_feature(feature)
- end
-
- def kill_node(node)
- @nodes_by_pid.delete(node.pid)
- # recall that this process explicitly traps INT -> exit 0
- Process.kill("INT", node.pid)
- end
-
- def kill_nodes_with_feature(file, base = @root)
- if base.has_feature?(file)
- if base == @root.children[0] || base == @root
- puts "\x1b[31mOne of zeus's dependencies changed. Not killing zeus. You may have to restart the server.\x1b[0m"
- return false
+ def handle_messages
+ loop do
+ begin
+ data = @r_msg.recv_nonblock(1024)
+ case data[0]
+ when FEATURE_TYPE
+ handle_feature_message(data[1..-1])
+ when PID_TYPE
+ handle_pid_message(data[1..-1])
+ else
+ raise "Unrecognized message"
end
- kill_node(base)
- return true
- else
- base.children.dup.each do |node|
- if kill_nodes_with_feature(file, node)
- base.children.delete(node)
- end
- end
- return false
+ rescue Errno::EAGAIN
+ break
end
end
-
end
- def self.handle_pid_message(data)
+ def handle_pid_message(data)
data =~ /(\d+):(\d+)/
- pid, ppid = $1.to_i, $2.to_i
- @@process_tree.process_has_parent(pid, ppid)
+ pid, ppid = $1.to_i, $2.to_i
+ @process_tree_monitor.process_has_parent(pid, ppid)
end
- def self.handle_feature_message(data)
+ def handle_feature_message(data)
data =~ /(\d+):(.*)/
- pid, file = $1.to_i, $2
- @@process_tree.process_has_feature(pid, file)
- return if @@file_watchers[file]
- begin
- @@file_watchers[file] = true
- @@queue.watch_file(file.chomp, :write, :extend, :rename, :delete, &method(:notify))
- # rescue Errno::EMFILE
- # exit 1
- rescue Errno::ENOENT
- puts "No file found at #{file.chomp}"
- end
+ pid, file = $1.to_i, $2
+ @process_tree_monitor.process_has_feature(pid, file)
+ @file_monitor.watch(file)
end
- class Stage
- attr_reader :pid
- def initialize(name)
- @name = name
- @stages, @actions = [], []
- end
-
- def action(&b)
- @actions << b
- end
-
- def stage(name, &b)
- @stages << Stage.new(name).tap { |s| s.instance_eval(&b) }
- end
-
- def acceptor(name, socket, &b)
- @stages << Acceptor.new(name, socket, &b)
- end
-
- # There are a few things we want to accomplish:
- # 1. Running all the actions (each time this stage is killed and restarted)
- # 2. Starting all the substages (and restarting them when necessary)
- # 3. Starting all the acceptors (and restarting them when necessary)
- def run
- @pid = fork {
- $0 = "zeus spawner: #{@name}"
- pid = Process.pid
- $w_pids.puts "#{pid}:#{Process.ppid}\n"
- puts "\x1b[35m[zeus] starting spawner `#{@name}`\x1b[0m"
- trap("INT") {
- puts "\x1b[35m[zeus] killing spawner `#{@name}`\x1b[0m"
- exit 0
- }
-
- @actions.each(&:call)
-
- $LOADED_FEATURES.each do |f|
- $w_features.puts "#{pid}:#{f}\n"
- end
-
- pids = {}
- @stages.each do |stage|
- pids[stage.run] = stage
- end
-
- loop do
- begin
- pid = Process.wait
- rescue Errno::ECHILD
- raise "Stage `#{@name}` has no children. All terminal nodes must be acceptors"
- end
- if (status = $?.exitstatus) > 0
- exit status
- else # restart the stage that died.
- stage = pids[pid]
- pids[stage.run] = stage
- end
- end
-
- }
- end
-
- end
-
- class Acceptor
- attr_reader :pid
- def initialize(name, socket, &b)
- @name = name
- @socket = socket
- @action = b
- end
-
- def run
- @pid = fork {
- $0 = "zeus acceptor: #{@name}"
- pid = Process.pid
- $w_pids.puts "#{pid}:#{Process.ppid}\n"
- $LOADED_FEATURES.each do |f|
- $w_features.puts "#{pid}:#{f}\n"
- end
- puts "\x1b[35m[zeus] starting acceptor `#{@name}`\x1b[0m"
- trap("INT") {
- puts "\x1b[35m[zeus] killing acceptor `#{@name}`\x1b[0m"
- exit 0
- }
-
- File.unlink(@socket) rescue nil
- server = UNIXServer.new(@socket)
- loop do
- ActiveRecord::Base.clear_all_connections! # TODO : refactor
- client = server.accept
- child = fork do
- ActiveRecord::Base.establish_connection # TODO :refactor
- ActiveSupport::DescendantsTracker.clear
- ActiveSupport::Dependencies.clear
-
- terminal = client.recv_io
- arguments = JSON.load(client.gets.strip)
-
- client << $$ << "\n"
- $stdin.reopen(terminal)
- $stdout.reopen(terminal)
- $stderr.reopen(terminal)
- ARGV.replace(arguments)
-
- @action.call
- end
- Process.detach(child)
- client.close
- end
- }
- end
-
+ def self.pid_has_file(pid, file)
+ @@files[file] ||= []
+ @@files[file] << pid
end
end
end