lib/rainbows/event_machine/client.rb in rainbows-2.1.0 vs lib/rainbows/event_machine/client.rb in rainbows-3.0.0
- old
+ new
@@ -1,23 +1,22 @@
# -*- encoding: binary -*-
# :enddoc:
class Rainbows::EventMachine::Client < EM::Connection
- attr_writer :body
include Rainbows::EvCore
def initialize(io)
@_io = io
- @body = nil
+ @deferred = nil
end
alias write send_data
def receive_data(data)
# To avoid clobbering the current streaming response
# (often a static file), we do not attempt to process another
# request on the same connection until the first is complete
- if @body
+ if @deferred
if data
@buf << data
@_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
end
EM.next_tick { receive_data(nil) } unless @buf.empty?
@@ -33,86 +32,80 @@
def app_call
set_comm_inactivity_timeout 0
@env[RACK_INPUT] = @input
@env[REMOTE_ADDR] = @_io.kgio_addr
- @env[ASYNC_CALLBACK] = method(:em_write_response)
+ @env[ASYNC_CALLBACK] = method(:write_async_response)
@env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+ status, headers, body = catch(:async) {
+ APP.call(@env.merge!(RACK_DEFAULTS))
+ }
- response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
+ (nil == status || -1 == status) ? @deferred = true :
+ ev_write_response(status, headers, body, @hp.next?)
+ end
- # too tricky to support pipelining with :async since the
- # second (pipelined) request could be a stuck behind a
- # long-running async response
- (response.nil? || -1 == response[0]) and return @state = :close
-
- if @hp.next? && G.alive && G.kato > 0
- @state = :headers
- em_write_response(response, true)
- if @buf.empty?
- set_comm_inactivity_timeout(G.kato)
- elsif @body.nil?
- EM.next_tick { receive_data(nil) }
- end
- else
- em_write_response(response, false)
+ def deferred_errback(orig_body)
+ @deferred.errback do
+ orig_body.close if orig_body.respond_to?(:close)
+ quit
end
end
- def em_write_response(response, alive = false)
- status, headers, body = response
- if @hp.headers?
- headers = HH.new(headers)
- headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
- else
- headers = nil
+ def deferred_callback(orig_body, alive)
+ @deferred.callback do
+ orig_body.close if orig_body.respond_to?(:close)
+ @deferred = nil
+ alive ? receive_data(nil) : quit
end
+ end
+ def ev_write_response(status, headers, body, alive)
+ @state = :headers if alive
if body.respond_to?(:errback) && body.respond_to?(:callback)
- @body = body
- body.callback { quit }
- body.errback { quit }
- # async response, this could be a trickle as is in comet-style apps
- headers[CONNECTION] = CLOSE if headers
- alive = true
+ @deferred = body
+ deferred_errback(body)
+ deferred_callback(body, alive)
elsif body.respond_to?(:to_path)
st = File.stat(path = body.to_path)
if st.file?
- write(response_header(status, headers)) if headers
- @body = stream_file_data(path)
- @body.errback do
- body.close if body.respond_to?(:close)
- quit
- end
- @body.callback do
- body.close if body.respond_to?(:close)
- @body = nil
- alive ? receive_data(nil) : quit
- end
+ write_headers(status, headers, alive)
+ @deferred = stream_file_data(path)
+ deferred_errback(body)
+ deferred_callback(body, alive)
return
elsif st.socket? || st.pipe?
- @body = io = body_to_io(body)
- chunk = stream_response_headers(status, headers) if headers
+ io = body_to_io(@deferred = body)
+ chunk = stream_response_headers(status, headers, alive)
m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
Rainbows::EventMachine::ResponsePipe
- return EM.watch(io, m, self, alive, body).notify_readable = true
+ return EM.watch(io, m, self).notify_readable = true
end
# char or block device... WTF? fall through to body.each
end
-
- write(response_header(status, headers)) if headers
- write_body_each(self, body)
- quit unless alive
+ write_response(status, headers, body, alive)
+ if alive
+ if @deferred.nil?
+ if @buf.empty?
+ set_comm_inactivity_timeout(Rainbows.keepalive_timeout)
+ else
+ EM.next_tick { receive_data(nil) }
+ end
+ end
+ else
+ quit unless @deferred
+ end
end
def next!
- @hp.keepalive? ? receive_data(@body = nil) : quit
+ @deferred.close if @deferred.respond_to?(:close)
+ @hp.keepalive? ? receive_data(@deferred = nil) : quit
end
def unbind
async_close = @env[ASYNC_CLOSE] and async_close.succeed
- @body.respond_to?(:fail) and @body.fail
+ @deferred.respond_to?(:fail) and @deferred.fail
begin
@_io.close
rescue Errno::EBADF
# EventMachine's EventableDescriptor::Close() may close
# the underlying file descriptor without invalidating the