lib/firehose/rack.rb in firehose-0.0.16 vs lib/firehose/rack.rb in firehose-0.1.0

- old
+ new

@@ -3,20 +3,18 @@ module Firehose module Rack AsyncResponse = [-1, {}, []] class HttpLongPoll - def initialize(broker) - @broker = broker - end - def call(env) req = ::Rack::Request.new(env) cid = req.params['cid'] path = req.path method = req.request_method timeout = 30 + queue_name = "#{cid}@#{path}" + # TODO seperate out CORS logic as an async middleware with a Goliath web server. cors_origin = env['HTTP_ORIGIN'] cors_headers = { 'Access-Control-Allow-Origin' => cors_origin, 'Access-Control-Allow-Methods' => 'GET', @@ -31,33 +29,40 @@ EM.next_tick do # If the request is a CORS request, return those headers, otherwise don't worry 'bout it response_headers = cors_origin ? cors_headers : {} # Setup a subscription with a client id. We haven't subscribed yet here. - consumer = @broker.consumer(cid) + if queue = queues[queue_name] + queue.live + else + queue = queues[queue_name] = Firehose::Subscription::Queue.new(cid, path) + end # Setup a timeout timer to tell clients that time out that everything is OK # and they should come back for more - timer = EventMachine::Timer.new(timeout) do + long_poll_timer = EM::Timer.new(timeout) do # We send a 204 OK to tell the client to reconnect. env['async.callback'].call [204, response_headers, []] Firehose.logger.debug "HTTP wait `#{cid}@#{path}` timed out" end # Ok, now subscribe to the subscription. - consumer.subscribe_to path do |message, subscription| - timer.cancel # Turn off the heart beat so we don't execute any of that business. - consumer.unsubscribe + queue.pop do |message, subscription| + long_poll_timer.cancel # Turn off the heart beat so we don't execute any of that business. env['async.callback'].call [200, response_headers, [message]] Firehose.logger.debug "HTTP sent `#{message}` to `#{cid}@#{path}`" end Firehose.logger.debug "HTTP subscribed to `#{cid}@#{path}`" # Unsubscribe from the subscription if its still open and something bad happened # or the heart beat triggered before we could finish. env['async.close'].callback do - consumer.unsubscribe if consumer + # Kill queue if we don't hear back in 30s + queue.kill timeout do + Firehose.logger.debug "Deleting queue to `#{queue_name}`" + queues.delete queue_name + end Firehose.logger.debug "HTTP connection `#{cid}@#{path}` closing" end end # Tell the web server that this will be an async response. @@ -65,73 +70,69 @@ # PUT is how we throw messages on to the fan-out queue. when 'PUT' body = env['rack.input'].read Firehose.logger.debug "HTTP published `#{body}` to `#{path}`" - Firehose::Publisher.new.publish(path, body) + publisher.publish(path, body) [202, {}, []] else Firehose.logger.debug "HTTP #{method} not supported" [501, {'Content-Type' => 'text/plain'}, ["#{method} not supported."]] end end + + private + def publisher + @publisher ||= Firehose::Publisher.new + end + + def queues + @queues ||= {} + end end class WebSocket < ::Rack::WebSocket::Application - attr_reader :cid, :path + attr_reader :cid, :path, :subscription - def initialize(broker) - @broker = broker - end - # Subscribe to a path and make some magic happen, mmkmay? def on_open(env) req = ::Rack::Request.new(env) @cid = req.params['cid'] @path = req.path + @subscription = Firehose::Subscription.new(cid, path) - @consumer = @broker.consumer(@cid) - @consumer.subscribe_to path do |message| + subscription.subscribe do |message, subscription| Firehose.logger.debug "WS sent `#{message}` to `#{cid}@#{path}`" send_data message end Firehose.logger.debug "WS subscribed to `#{cid}@#{path}`" end # Delete the subscription if the thing even happened. def on_close(env) - @consumer.unsubscribe if @consumer + subscription.unsubscribe Firehose.logger.debug "WS connection `#{cid}@#{path}` closing" end # Log websocket level errors def on_error(env, error) - Firehose.logger.error "WS connection `#{cid}@#{path}` error `#{error}`: #{env.inspect}" - @consumer.unsubscribe if @consumer + Firehose.logger.error "WS connection `#{cid}@#{path}` error `#{error}`: #{error.backtrace}" end end class App - # Firehose broker that will be used to pub/sub messages. - attr_reader :broker - - # Fire up a default broker if one is not specified. - def initialize(broker = Firehose::Broker.new) - @broker = broker - end - def call(env) websocket_request?(env) ? websocket.call(env) : http_long_poll.call(env) end private def websocket - @websocket ||= WebSocket.new(broker) + WebSocket.new end def http_long_poll - @http_long_poll ||= HttpLongPoll.new(broker) + @http_long_poll ||= HttpLongPoll.new end def websocket_request?(env) env['HTTP_UPGRADE'] =~ /websocket/i end \ No newline at end of file