lib/rainbows/revactor.rb in rainbows-0.1.1 vs lib/rainbows/revactor.rb in rainbows-0.2.0
- old
+ new
@@ -12,16 +12,18 @@
# long-lived Actor for every listen socket in the process and spawns a
# new Actor for every client connection accept()-ed.
# +worker_connections+ will limit the number of client Actors we have
# running at any one time.
#
- # Applications using this model are required to be reentrant, but
- # generally do not have to worry about race conditions. Multiple
- # instances of the same app may run in the same address space
+ # Applications using this model are required to be reentrant, but do
+ # not have to worry about race conditions unless they use threads
+ # internally. \Rainbows! does not spawn threads under this model.
+ # Multiple instances of the same app may run in the same address space
# sequentially (but at interleaved points). Any network dependencies
# in the application using this model should be implemented using the
- # \Revactor library as well.
+ # \Revactor library as well, to take advantage of the networking
+ # concurrency features this model provides.
module Revactor
require 'rainbows/revactor/tee_input'
include Base
@@ -30,10 +32,11 @@
# in 3 easy steps: read request, call app, write app response
def process_client(client)
buf = client.read or return # this probably does not happen...
hp = HttpParser.new
env = {}
+ alive = true
remote_addr = ::Revactor::TCP::Socket === client ?
client.remote_addr : LOCALHOST
begin
while ! hp.headers(env, buf)
@@ -50,13 +53,14 @@
client.write(Const::EXPECT_100_RESPONSE)
env.delete(Const::HTTP_EXPECT)
response = app.call(env)
end
- out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+ alive = hp.keepalive? && ! Actor.current[:quit]
+ out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
HttpResponse.write(client, response, out)
- end while hp.keepalive? and hp.reset.nil? and env.clear
+ end while alive and hp.reset.nil? and env.clear
client.close
# if we get any error, try to write something back to the client
# assuming we haven't closed the socket, but don't get hung up
# if the socket is already closed or broken. We'll always ensure
# the socket is closed at the end of this function
@@ -72,63 +76,61 @@
# runs inside each forked worker, this sits around and waits
# for connections and doesn't die until the parent dies (or is
# given a INT, QUIT, or TERM signal)
def worker_loop(worker)
- ppid = master_pid
init_worker_process(worker)
- alive = worker.tmp # tmp is our lifeline to the master process
- trap(:USR1) { reopen_worker_logs(worker.nr) }
- trap(:QUIT) { alive = false; LISTENERS.each { |s| s.close rescue nil } }
- [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
-
root = Actor.current
root.trap_exit = true
limit = worker_connections
- listeners = revactorize_listeners
- logger.info "worker=#{worker.nr} ready with Revactor"
- clients = 0
+ revactorize_listeners!
+ clients = {}
+ alive = true
- listeners.map! do |s|
+ listeners = LISTENERS.map do |s|
Actor.spawn(s) do |l|
begin
- while clients >= limit
- logger.info "busy: clients=#{clients} >= limit=#{limit}"
+ while clients.size >= limit
+ logger.info "busy: clients=#{clients.size} >= limit=#{limit}"
Actor.receive { |filter| filter.when(:resume) {} }
end
actor = Actor.spawn(l.accept) { |c| process_client(c) }
- clients += 1
+ clients[actor.object_id] = actor
root.link(actor)
rescue Errno::EAGAIN, Errno::ECONNABORTED
+ rescue Errno::EBADF
+ break
rescue Object => e
- if alive
- logger.error "Unhandled listen loop exception #{e.inspect}."
- logger.error e.backtrace.join("\n")
- end
+ listen_loop_error(e) if alive
end while alive
end
end
- nr = 0
+ m = 0
+ check_quit = lambda do
+ worker.tmp.chmod(m = 0 == m ? 1 : 0)
+ if listeners.any? { |l| l.dead? } ||
+ master_pid != Process.ppid ||
+ LISTENERS.first.nil?
+ alive = false
+ clients.each_value { |a| a[:quit] = true }
+ end
+ end
+
begin
Actor.receive do |filter|
- filter.after(1) do
- if alive
- alive.chmod(nr = 0 == nr ? 1 : 0)
- listeners.each { |l| alive = false if l.dead? }
- ppid == Process.ppid or alive = false
- end
- end
+ filter.after(timeout, &check_quit)
filter.when(Case[:exit, Actor, Object]) do |_,actor,_|
- orig = clients
- clients -= 1
+ orig = clients.size
+ clients.delete(actor.object_id)
orig >= limit and listeners.each { |l| l << :resume }
+ check_quit.call
end
end
- end while alive || clients > 0
+ end while alive || clients.size > 0
end
private
# write a response without caring if it went out or not
@@ -139,20 +141,21 @@
@_io.write_nonblock(response_str) rescue nil
end
client.close rescue nil
end
- def revactorize_listeners
- LISTENERS.map do |s|
+ def revactorize_listeners!
+ LISTENERS.map! do |s|
if TCPServer === s
::Revactor::TCP.listen(s, nil)
elsif defined?(::Revactor::UNIX) && UNIXServer === s
::Revactor::UNIX.listen(s)
else
logger.error "your version of Revactor can't handle #{s.inspect}"
nil
end
- end.compact
+ end
+ LISTENERS.compact!
end
end
end