lib/rainbows/revactor.rb in rainbows-0.1.1 vs lib/rainbows/revactor.rb in rainbows-0.2.0

- old
+ new

@@ -12,16 +12,18 @@ # long-lived Actor for every listen socket in the process and spawns a # new Actor for every client connection accept()-ed. # +worker_connections+ will limit the number of client Actors we have # running at any one time. # - # Applications using this model are required to be reentrant, but - # generally do not have to worry about race conditions. Multiple - # instances of the same app may run in the same address space + # Applications using this model are required to be reentrant, but do + # not have to worry about race conditions unless they use threads + # internally. \Rainbows! does not spawn threads under this model. + # Multiple instances of the same app may run in the same address space # sequentially (but at interleaved points). Any network dependencies # in the application using this model should be implemented using the - # \Revactor library as well. + # \Revactor library as well, to take advantage of the networking + # concurrency features this model provides. module Revactor require 'rainbows/revactor/tee_input' include Base @@ -30,10 +32,11 @@ # in 3 easy steps: read request, call app, write app response def process_client(client) buf = client.read or return # this probably does not happen... hp = HttpParser.new env = {} + alive = true remote_addr = ::Revactor::TCP::Socket === client ? client.remote_addr : LOCALHOST begin while ! hp.headers(env, buf) @@ -50,13 +53,14 @@ client.write(Const::EXPECT_100_RESPONSE) env.delete(Const::HTTP_EXPECT) response = app.call(env) end - out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + alive = hp.keepalive? && ! Actor.current[:quit] + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? HttpResponse.write(client, response, out) - end while hp.keepalive? and hp.reset.nil? and env.clear + end while alive and hp.reset.nil? and env.clear client.close # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure # the socket is closed at the end of this function @@ -72,63 +76,61 @@ # runs inside each forked worker, this sits around and waits # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) def worker_loop(worker) - ppid = master_pid init_worker_process(worker) - alive = worker.tmp # tmp is our lifeline to the master process - trap(:USR1) { reopen_worker_logs(worker.nr) } - trap(:QUIT) { alive = false; LISTENERS.each { |s| s.close rescue nil } } - [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown - root = Actor.current root.trap_exit = true limit = worker_connections - listeners = revactorize_listeners - logger.info "worker=#{worker.nr} ready with Revactor" - clients = 0 + revactorize_listeners! + clients = {} + alive = true - listeners.map! do |s| + listeners = LISTENERS.map do |s| Actor.spawn(s) do |l| begin - while clients >= limit - logger.info "busy: clients=#{clients} >= limit=#{limit}" + while clients.size >= limit + logger.info "busy: clients=#{clients.size} >= limit=#{limit}" Actor.receive { |filter| filter.when(:resume) {} } end actor = Actor.spawn(l.accept) { |c| process_client(c) } - clients += 1 + clients[actor.object_id] = actor root.link(actor) rescue Errno::EAGAIN, Errno::ECONNABORTED + rescue Errno::EBADF + break rescue Object => e - if alive - logger.error "Unhandled listen loop exception #{e.inspect}." - logger.error e.backtrace.join("\n") - end + listen_loop_error(e) if alive end while alive end end - nr = 0 + m = 0 + check_quit = lambda do + worker.tmp.chmod(m = 0 == m ? 1 : 0) + if listeners.any? { |l| l.dead? } || + master_pid != Process.ppid || + LISTENERS.first.nil? + alive = false + clients.each_value { |a| a[:quit] = true } + end + end + begin Actor.receive do |filter| - filter.after(1) do - if alive - alive.chmod(nr = 0 == nr ? 1 : 0) - listeners.each { |l| alive = false if l.dead? } - ppid == Process.ppid or alive = false - end - end + filter.after(timeout, &check_quit) filter.when(Case[:exit, Actor, Object]) do |_,actor,_| - orig = clients - clients -= 1 + orig = clients.size + clients.delete(actor.object_id) orig >= limit and listeners.each { |l| l << :resume } + check_quit.call end end - end while alive || clients > 0 + end while alive || clients.size > 0 end private # write a response without caring if it went out or not @@ -139,20 +141,21 @@ @_io.write_nonblock(response_str) rescue nil end client.close rescue nil end - def revactorize_listeners - LISTENERS.map do |s| + def revactorize_listeners! + LISTENERS.map! do |s| if TCPServer === s ::Revactor::TCP.listen(s, nil) elsif defined?(::Revactor::UNIX) && UNIXServer === s ::Revactor::UNIX.listen(s) else logger.error "your version of Revactor can't handle #{s.inspect}" nil end - end.compact + end + LISTENERS.compact! end end end