lib/faye/adapters/rack_adapter.rb in faye-0.7.2 vs lib/faye/adapters/rack_adapter.rb in faye-0.8.0

- old
+ new

@@ -4,11 +4,10 @@ include Logging extend Forwardable def_delegators "@server.engine", :bind, :unbind - # Only supported under Thin ASYNC_RESPONSE = [-1, {}, []].freeze DEFAULT_ENDPOINT = '/bayeux' SCRIPT_PATH = File.join(ROOT, 'faye-browser-min.js') @@ -39,10 +38,11 @@ def get_client @client ||= Client.new(@server) end def listen(port, ssl_options = nil) + Faye::WebSocket.load_adapter('thin') handler = Rack::Handler.get('thin') handler.run(self, :Port => port) do |s| if ssl_options s.ssl = true s.ssl_options = { @@ -70,11 +70,12 @@ [404, TYPE_TEXT, ["Sure you're not looking for #{@endpoint} ?"]] end return serve_client_script(env) if request.path_info =~ /\.js$/ return handle_options(request) if env['REQUEST_METHOD'] == 'OPTIONS' - return handle_upgrade(request) if env['HTTP_CONNECTION'] =~ /\bUpgrade\b/i + return handle_websocket(env) if Faye::WebSocket.websocket?(env) + return handle_eventsource(env) if Faye::EventSource.eventsource?(env) handle_request(request) end private @@ -99,53 +100,78 @@ end end def handle_request(request) json_msg = message_from_request(request) - message = JSON.parse(json_msg) + message = Yajl::Parser.parse(json_msg) jsonp = request.params['jsonp'] || JSONP_CALLBACK headers = request.get? ? TYPE_SCRIPT.dup : TYPE_JSON.dup origin = request.env['HTTP_ORIGIN'] callback = request.env['async.callback'] - body = DeferredBody.new debug 'Received ?: ?', request.env['REQUEST_METHOD'], json_msg @server.flush_connection(message) if request.get? headers['Access-Control-Allow-Origin'] = origin if origin headers['Cache-Control'] = 'no-cache, no-store' if request.get? - callback.call [200, headers, body] @server.process(message, false) do |replies| - response = JSON.unparse(replies) + response = Faye.to_json(replies) response = "#{ jsonp }(#{ response });" if request.get? debug 'Returning ?', response - body.succeed(response) + callback.call [200, headers, [response]] end ASYNC_RESPONSE - rescue + rescue => e + error "#{e.message}\nBacktrace:\n#{e.backtrace * "\n"}" [400, TYPE_TEXT, ['Bad request']] end - def handle_upgrade(request) - socket = Faye::WebSocket.new(request.env) + def handle_websocket(env) + ws = Faye::WebSocket.new(env, nil, :ping => @options[:ping]) + client_id = nil - socket.onmessage = lambda do |message| + ws.onmessage = lambda do |event| begin - message = JSON.parse(message.data) - debug "Received via WebSocket[#{socket.version}]: ?", message + message = Yajl::Parser.parse(event.data) + client_id = [message].flatten[0]['clientId'] + + debug "Received via WebSocket[#{ws.version}]: ?", message + @server.open_socket(client_id, ws) + @server.process(message, false) do |replies| - debug "Sending via WebSocket[#{socket.version}]: ?", replies - socket.send(JSON.unparse(replies)) + ws.send(Faye.to_json(replies)) if ws end - rescue + rescue => e + error "#{e.message}\nBacktrace:\n#{e.backtrace * "\n"}" end end - ASYNC_RESPONSE + + ws.onclose = lambda do |event| + @server.close_socket(client_id) + ws = nil + end + + ws.rack_response end + def handle_eventsource(env) + es = Faye::EventSource.new(env, :ping => @options[:ping]) + client_id = es.url.split('/').pop + + debug 'Opened EventSource connection for ?', client_id + @server.open_socket(client_id, es) + + es.onclose = lambda do |event| + @server.close_socket(client_id) + es = nil + end + + es.rack_response + end + def message_from_request(request) message = request.params['message'] return message if message # Some clients do not send a content-type, e.g. @@ -167,14 +193,9 @@ 'Access-Control-Max-Age' => '86400', 'Access-Control-Allow-Methods' => 'POST, GET, PUT, DELETE, OPTIONS', 'Access-Control-Allow-Headers' => 'Accept, Content-Type, X-Requested-With' } [200, headers, ['']] - end - - class DeferredBody - include EventMachine::Deferrable - alias :each :callback end end end