lib/rainbows/event_machine/client.rb in rainbows-2.1.0 vs lib/rainbows/event_machine/client.rb in rainbows-3.0.0

- old
+ new

@@ -1,23 +1,22 @@ # -*- encoding: binary -*- # :enddoc: class Rainbows::EventMachine::Client < EM::Connection - attr_writer :body include Rainbows::EvCore def initialize(io) @_io = io - @body = nil + @deferred = nil end alias write send_data def receive_data(data) # To avoid clobbering the current streaming response # (often a static file), we do not attempt to process another # request on the same connection until the first is complete - if @body + if @deferred if data @buf << data @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 end EM.next_tick { receive_data(nil) } unless @buf.empty? @@ -33,86 +32,80 @@ def app_call set_comm_inactivity_timeout 0 @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @_io.kgio_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) + @env[ASYNC_CALLBACK] = method(:write_async_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + status, headers, body = catch(:async) { + APP.call(@env.merge!(RACK_DEFAULTS)) + } - response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } + (nil == status || -1 == status) ? @deferred = true : + ev_write_response(status, headers, body, @hp.next?) + end - # too tricky to support pipelining with :async since the - # second (pipelined) request could be a stuck behind a - # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close - - if @hp.next? && G.alive && G.kato > 0 - @state = :headers - em_write_response(response, true) - if @buf.empty? - set_comm_inactivity_timeout(G.kato) - elsif @body.nil? - EM.next_tick { receive_data(nil) } - end - else - em_write_response(response, false) + def deferred_errback(orig_body) + @deferred.errback do + orig_body.close if orig_body.respond_to?(:close) + quit end end - def em_write_response(response, alive = false) - status, headers, body = response - if @hp.headers? - headers = HH.new(headers) - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - else - headers = nil + def deferred_callback(orig_body, alive) + @deferred.callback do + orig_body.close if orig_body.respond_to?(:close) + @deferred = nil + alive ? receive_data(nil) : quit end + end + def ev_write_response(status, headers, body, alive) + @state = :headers if alive if body.respond_to?(:errback) && body.respond_to?(:callback) - @body = body - body.callback { quit } - body.errback { quit } - # async response, this could be a trickle as is in comet-style apps - headers[CONNECTION] = CLOSE if headers - alive = true + @deferred = body + deferred_errback(body) + deferred_callback(body, alive) elsif body.respond_to?(:to_path) st = File.stat(path = body.to_path) if st.file? - write(response_header(status, headers)) if headers - @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data(nil) : quit - end + write_headers(status, headers, alive) + @deferred = stream_file_data(path) + deferred_errback(body) + deferred_callback(body, alive) return elsif st.socket? || st.pipe? - @body = io = body_to_io(body) - chunk = stream_response_headers(status, headers) if headers + io = body_to_io(@deferred = body) + chunk = stream_response_headers(status, headers, alive) m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : Rainbows::EventMachine::ResponsePipe - return EM.watch(io, m, self, alive, body).notify_readable = true + return EM.watch(io, m, self).notify_readable = true end # char or block device... WTF? fall through to body.each end - - write(response_header(status, headers)) if headers - write_body_each(self, body) - quit unless alive + write_response(status, headers, body, alive) + if alive + if @deferred.nil? + if @buf.empty? + set_comm_inactivity_timeout(Rainbows.keepalive_timeout) + else + EM.next_tick { receive_data(nil) } + end + end + else + quit unless @deferred + end end def next! - @hp.keepalive? ? receive_data(@body = nil) : quit + @deferred.close if @deferred.respond_to?(:close) + @hp.keepalive? ? receive_data(@deferred = nil) : quit end def unbind async_close = @env[ASYNC_CLOSE] and async_close.succeed - @body.respond_to?(:fail) and @body.fail + @deferred.respond_to?(:fail) and @deferred.fail begin @_io.close rescue Errno::EBADF # EventMachine's EventableDescriptor::Close() may close # the underlying file descriptor without invalidating the