lib/rainbows/event_machine/client.rb in rainbows-4.4.3 vs lib/rainbows/event_machine/client.rb in rainbows-4.5.0
- old
+ new
@@ -8,10 +8,11 @@
@_io = io
@deferred = nil
end
alias write send_data
+ alias hijacked detach
def receive_data(data)
# To avoid clobbering the current streaming response
# (often a static file), we do not attempt to process another
# request on the same connection until the first is complete
@@ -35,13 +36,15 @@
set_comm_inactivity_timeout 0
@env[RACK_INPUT] = input
@env[REMOTE_ADDR] = @_io.kgio_addr
@env[ASYNC_CALLBACK] = method(:write_async_response)
@env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+ @hp.hijack_setup(@env, @_io)
status, headers, body = catch(:async) {
APP.call(@env.merge!(RACK_DEFAULTS))
}
+ return hijacked if @hp.hijacked?
if (nil == status || -1 == status)
@deferred = true
else
ev_write_response(status, headers, body, @hp.next?)
@@ -65,35 +68,36 @@
end
def ev_write_response(status, headers, body, alive)
@state = :headers if alive
if body.respond_to?(:errback) && body.respond_to?(:callback)
+ write_headers(status, headers, alive, body) or return hijacked
@deferred = body
- write_headers(status, headers, alive)
write_body_each(body)
deferred_errback(body)
deferred_callback(body, alive)
return
elsif body.respond_to?(:to_path)
st = File.stat(path = body.to_path)
if st.file?
- write_headers(status, headers, alive)
+ write_headers(status, headers, alive, body) or return hijacked
@deferred = stream_file_data(path)
deferred_errback(body)
deferred_callback(body, alive)
return
elsif st.socket? || st.pipe?
+ chunk = stream_response_headers(status, headers, alive, body)
+ return hijacked if nil == chunk
io = body_to_io(@deferred = body)
- chunk = stream_response_headers(status, headers, alive)
m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
Rainbows::EventMachine::ResponsePipe
return EM.watch(io, m, self).notify_readable = true
end
# char or block device... WTF? fall through to body.each
end
- write_response(status, headers, body, alive)
+ write_response(status, headers, body, alive) or return hijacked
if alive
if @deferred.nil?
if @buf.empty?
set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
else
@@ -110,9 +114,10 @@
@deferred = nil
@hp.keepalive? ? receive_data(nil) : quit
end
def unbind
+ return if @hp.hijacked?
async_close = @env[ASYNC_CLOSE] and async_close.succeed
@deferred.respond_to?(:fail) and @deferred.fail
begin
@_io.close
rescue Errno::EBADF