lib/firehose/rack.rb in firehose-0.0.8 vs lib/firehose/rack.rb in firehose-0.0.9
- old
+ new
@@ -8,33 +8,64 @@
def call(env)
req = ::Rack::Request.new(env)
cid = req.params['cid']
path = req.path
method = req.request_method
+ timeout = 30
+ cors_headers = {
+ 'Access-Control-Allow-Origin' => env['HTTP_ORIGIN'],
+ 'Access-Control-Allow-Methods' => 'GET',
+ 'Access-Control-Max-Age' => '1728000'
+ }
case method
# GET is how clients subscribe to the queue. When a messages comes in, we flush out a response,
# close down the requeust, and the client then reconnects.
when 'GET'
+ p [:subscribed, cid, path]
+
EM.next_tick do
+ # Setup a subscription with a client id. We haven't subscribed yet here.
subscription = Firehose::Subscription.new(cid)
+
+ # Setup a timeout timer to tell clients that time out that everything is OK
+ # and they should come back for more
+ timer = EM.add_timer(timeout) do
+ # We send a 204 OK to tell the client to reconnect.
+ env['async.callback'].call [204, cors_headers, []]
+ p [:timeout]
+ end
+
+ # Ok, now subscribe to the subscription.
subscription.subscribe path do |payload|
subscription.unsubscribe
- env['async.callback'].call([200, {}, [payload]])
+ subscription = nil # Set this to nil so that our heart beat timer doesn't try to double unsub.
+ EM.cancel_timer timer # Turn off the heart beat so we don't execute any of that business.
+ env['async.callback'].call [200, cors_headers, [payload]]
end
+
+ # Unsubscribe from the subscription if its still open and something bad happened
+ # or the heart beat triggered before we could finish.
+ env['async.close'].callback do
+ if subscription
+ subscription.unsubscribe
+ p [:close_unsubscription]
+ end
+ end
end
+ # Tell the web server that this will be an async response.
Firehose::Rack::AsyncResponse
# PUT is how we throw messages on to the fan-out queue.
when 'PUT'
body = env['rack.input'].read
p [:put, path, body]
Firehose::Publisher.new.publish(path, body)
[202, {}, []]
else
- [501, {}, ["#{method} not supported."]]
+ [501, {'Content-Type' => 'text/plain'}, ["#{method} not supported."]]
end
end
end
class WebSocket < ::Rack::WebSocket::Application
\ No newline at end of file