lib/rainbows/event_machine/client.rb in rainbows-4.4.3 vs lib/rainbows/event_machine/client.rb in rainbows-4.5.0

- old
+ new

@@ -8,10 +8,11 @@ @_io = io @deferred = nil end alias write send_data + alias hijacked detach def receive_data(data) # To avoid clobbering the current streaming response # (often a static file), we do not attempt to process another # request on the same connection until the first is complete @@ -35,13 +36,15 @@ set_comm_inactivity_timeout 0 @env[RACK_INPUT] = input @env[REMOTE_ADDR] = @_io.kgio_addr @env[ASYNC_CALLBACK] = method(:write_async_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + @hp.hijack_setup(@env, @_io) status, headers, body = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + return hijacked if @hp.hijacked? if (nil == status || -1 == status) @deferred = true else ev_write_response(status, headers, body, @hp.next?) @@ -65,35 +68,36 @@ end def ev_write_response(status, headers, body, alive) @state = :headers if alive if body.respond_to?(:errback) && body.respond_to?(:callback) + write_headers(status, headers, alive, body) or return hijacked @deferred = body - write_headers(status, headers, alive) write_body_each(body) deferred_errback(body) deferred_callback(body, alive) return elsif body.respond_to?(:to_path) st = File.stat(path = body.to_path) if st.file? - write_headers(status, headers, alive) + write_headers(status, headers, alive, body) or return hijacked @deferred = stream_file_data(path) deferred_errback(body) deferred_callback(body, alive) return elsif st.socket? || st.pipe? + chunk = stream_response_headers(status, headers, alive, body) + return hijacked if nil == chunk io = body_to_io(@deferred = body) - chunk = stream_response_headers(status, headers, alive) m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : Rainbows::EventMachine::ResponsePipe return EM.watch(io, m, self).notify_readable = true end # char or block device... WTF? fall through to body.each end - write_response(status, headers, body, alive) + write_response(status, headers, body, alive) or return hijacked if alive if @deferred.nil? if @buf.empty? set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT) else @@ -110,9 +114,10 @@ @deferred = nil @hp.keepalive? ? receive_data(nil) : quit end def unbind + return if @hp.hijacked? async_close = @env[ASYNC_CLOSE] and async_close.succeed @deferred.respond_to?(:fail) and @deferred.fail begin @_io.close rescue Errno::EBADF