lib/firehose/rack.rb in firehose-0.0.8 vs lib/firehose/rack.rb in firehose-0.0.9

- old
+ new

@@ -8,33 +8,64 @@ def call(env) req = ::Rack::Request.new(env) cid = req.params['cid'] path = req.path method = req.request_method + timeout = 30 + cors_headers = { + 'Access-Control-Allow-Origin' => env['HTTP_ORIGIN'], + 'Access-Control-Allow-Methods' => 'GET', + 'Access-Control-Max-Age' => '1728000' + } case method # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response, # close down the requeust, and the client then reconnects. when 'GET' + p [:subscribed, cid, path] + EM.next_tick do + # Setup a subscription with a client id. We haven't subscribed yet here. subscription = Firehose::Subscription.new(cid) + + # Setup a timeout timer to tell clients that time out that everything is OK + # and they should come back for more + timer = EM.add_timer(timeout) do + # We send a 204 OK to tell the client to reconnect. + env['async.callback'].call [204, cors_headers, []] + p [:timeout] + end + + # Ok, now subscribe to the subscription. subscription.subscribe path do |payload| subscription.unsubscribe - env['async.callback'].call([200, {}, [payload]]) + subscription = nil # Set this to nil so that our heart beat timer doesn't try to double unsub. + EM.cancel_timer timer # Turn off the heart beat so we don't execute any of that business. + env['async.callback'].call [200, cors_headers, [payload]] end + + # Unsubscribe from the subscription if its still open and something bad happened + # or the heart beat triggered before we could finish. + env['async.close'].callback do + if subscription + subscription.unsubscribe + p [:close_unsubscription] + end + end end + # Tell the web server that this will be an async response. Firehose::Rack::AsyncResponse # PUT is how we throw messages on to the fan-out queue. when 'PUT' body = env['rack.input'].read p [:put, path, body] Firehose::Publisher.new.publish(path, body) [202, {}, []] else - [501, {}, ["#{method} not supported."]] + [501, {'Content-Type' => 'text/plain'}, ["#{method} not supported."]] end end end class WebSocket < ::Rack::WebSocket::Application \ No newline at end of file