lib/rainbows/rev.rb in rainbows-0.2.0 vs lib/rainbows/rev.rb in rainbows-0.3.0
- old
+ new
@@ -10,53 +10,57 @@
# Implements a basic single-threaded event model with
# {Rev}[http://rev.rubyforge.org/]. It is capable of handling
# thousands of simultaneous client connections, but with only a
# single-threaded app dispatch. It is suited for slow clients and
# fast applications (applications that do not have slow network
- # dependencies). It does not require your Rack application to
- # be reentrant or thread-safe.
+ # dependencies) or applications that use DevFdResponse for deferrable
+ # response bodies. It does not require your Rack application to be
+ # thread-safe, reentrancy is only required for the DevFdResponse body
+ # generator.
#
# Compatibility: Whatever \Rev itself supports, currently Ruby
# 1.8/1.9.
#
# This model does not implement as streaming "rack.input" which
# allows the Rack application to process data as it arrives. This
# means "rack.input" will be fully buffered in memory or to a
# temporary file before the application is entered.
- #
- # Caveats: this model can buffer all output for slow clients in
- # memory. This can be a problem if your application generates large
- # responses (including static files served with Rack) as it will cause
- # the memory footprint of your process to explode. If your workers
- # seem to be eating a lot of memory from this, consider the
- # {mall}[http://bogomips.org/mall/] library which allows access to the
- # mallopt(3) function from Ruby.
module Rev
- # global vars because class/instance variables are confusing me :<
- # this struct is only accessed inside workers and thus private to each
- G = Struct.new(:nr, :max, :logger, :alive, :app).new
-
include Base
class Client < ::Rev::IO
include Unicorn
include Rainbows::Const
- G = Rainbows::Rev::G
+ G = Rainbows::G
+ # queued, optional response bodies, it should only be unpollable "fast"
+ # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk
+ # are also part of this. We'll also stick DeferredResponse bodies in
+ # here to prevent connections from being closed on us.
+ attr_reader :deferred_bodies
+
def initialize(io)
- G.nr += 1
+ G.cur += 1
super(io)
@remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
@env = {}
@hp = HttpParser.new
@state = :headers # [ :body [ :trailers ] ] :app_call :close
@buf = ""
+ @deferred_bodies = [] # for (fast) regular files only
end
+ # graceful exit, like SIGQUIT
+ def quit
+ @deferred_bodies.clear
+ @state = :close
+ end
+
def handle_error(e)
+ quit
msg = case e
when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
ERROR_500_RESPONSE
when HttpParserError # try to tell the client they're bad
ERROR_400_RESPONSE
@@ -64,37 +68,55 @@
G.logger.error "Read error: #{e.inspect}"
G.logger.error e.backtrace.join("\n")
ERROR_500_RESPONSE
end
write(msg)
- ensure
- @state = :close
end
def app_call
- @input.rewind
- @env[RACK_INPUT] = @input
- @env[REMOTE_ADDR] = @remote_addr
- response = G.app.call(@env.update(RACK_DEFAULTS))
- alive = @hp.keepalive? && G.alive
- out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
- HttpResponse.write(self, response, out)
- if alive
- @env.clear
- @hp.reset
- @state = :headers
- else
- @state = :close
- end
+ begin
+ (@env[RACK_INPUT] = @input).rewind
+ alive = @hp.keepalive?
+ @env[REMOTE_ADDR] = @remote_addr
+ response = G.app.call(@env.update(RACK_DEFAULTS))
+ alive &&= G.alive
+ out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+
+ DeferredResponse.write(self, response, out)
+ if alive
+ @env.clear
+ @hp.reset
+ @state = :headers
+ # keepalive requests are always body-less, so @input is unchanged
+ @hp.headers(@env, @buf) and next
+ else
+ @state = :close
+ end
+ return
+ end while true
end
def on_write_complete
- :close == @state and close
+ if body = @deferred_bodies.first
+ return if DeferredResponse === body
+ begin
+ begin
+ write(body.sysread(CHUNK_SIZE))
+ rescue EOFError # expected at file EOF
+ @deferred_bodies.shift
+ body.close
+ end
+ rescue Object => e
+ handle_error(e)
+ end
+ else
+ close if :close == @state
+ end
end
def on_close
- G.nr -= 1
+ G.cur -= 1
end
def tmpio
io = Util.tmpio
def io.size
@@ -142,33 +164,104 @@
handle_error(e)
end
end
class Server < ::Rev::IO
- G = Rainbows::Rev::G
+ G = Rainbows::G
def on_readable
- return if G.nr >= G.max
+ return if G.cur >= G.max
begin
Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
rescue Errno::EAGAIN, Errno::ECONNBORTED
end
end
end
+ class DeferredResponse < ::Rev::IO
+ include Unicorn
+ include Rainbows::Const
+ G = Rainbows::G
+
+ def self.defer!(client, response, out)
+ body = response.last
+ headers = Rack::Utils::HeaderHash.new(response[1])
+
+ # to_io is not part of the Rack spec, but make an exception
+ # here since we can't get here without checking to_path first
+ io = body.to_io if body.respond_to?(:to_io)
+ io ||= ::IO.new($1.to_i) if body.to_path =~ %r{\A/dev/fd/(\d+)\z}
+ io ||= File.open(File.expand_path(body.to_path), 'rb')
+ st = io.stat
+
+ if st.socket? || st.pipe?
+ do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+ do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+ # too tricky to support keepalive/pipelining when a response can
+ # take an indeterminate amount of time here.
+ out[0] = CONN_CLOSE
+
+ io = new(io, client, do_chunk, body).attach(::Rev::Loop.default)
+ elsif st.file?
+ headers.delete('Transfer-Encoding')
+ headers['Content-Length'] ||= st.size.to_s
+ else # char/block device, directory, whatever... nobody cares
+ return response
+ end
+ client.deferred_bodies << io
+ [ response.first, headers.to_hash, [] ]
+ end
+
+ def self.write(client, response, out)
+ response.last.respond_to?(:to_path) and
+ response = defer!(client, response, out)
+ HttpResponse.write(client, response, out)
+ end
+
+ def initialize(io, client, do_chunk, body)
+ super(io)
+ @client, @do_chunk, @body = client, do_chunk, body
+ end
+
+ def on_read(data)
+ @do_chunk and @client.write(sprintf("%x\r\n", data.size))
+ @client.write(data)
+ @do_chunk and @client.write("\r\n")
+ end
+
+ def on_close
+ @do_chunk and @client.write("0\r\n\r\n")
+ @client.quit
+ @body.respond_to?(:close) and @body.close
+ end
+ end
+
+ # This timer handles the fchmod heartbeat to prevent our master
+ # from killing us.
+ class Heartbeat < ::Rev::TimerWatcher
+ G = Rainbows::G
+
+ def initialize(tmp)
+ @m, @tmp = 0, tmp
+ super(1, true)
+ end
+
+ def on_timer
+ @tmp.chmod(@m = 0 == @m ? 1 : 0)
+ exit if (! G.alive && G.cur <= 0)
+ end
+ end
+
# runs inside each forked worker, this sits around and waits
# for connections and doesn't die until the parent dies (or is
# given a INT, QUIT, or TERM signal)
def worker_loop(worker)
init_worker_process(worker)
- G.nr = 0
- G.max = worker_connections
- G.alive = true
- G.logger = logger
- G.app = app
- LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
- ::Rev::Loop.default.run
+ rloop = ::Rev::Loop.default
+ Heartbeat.new(worker.tmp).attach(rloop)
+ LISTENERS.map! { |s| Server.new(s).attach(rloop) }
+ rloop.run
end
end
end