lib/firehose/rack.rb in firehose-0.0.16 vs lib/firehose/rack.rb in firehose-0.1.0
- old
+ new
@@ -3,20 +3,18 @@
module Firehose
module Rack
AsyncResponse = [-1, {}, []]
class HttpLongPoll
- def initialize(broker)
- @broker = broker
- end
-
def call(env)
req = ::Rack::Request.new(env)
cid = req.params['cid']
path = req.path
method = req.request_method
timeout = 30
+ queue_name = "#{cid}@#{path}"
+
# TODO seperate out CORS logic as an async middleware with a Goliath web server.
cors_origin = env['HTTP_ORIGIN']
cors_headers = {
'Access-Control-Allow-Origin' => cors_origin,
'Access-Control-Allow-Methods' => 'GET',
@@ -31,33 +29,40 @@
EM.next_tick do
# If the request is a CORS request, return those headers, otherwise don't worry 'bout it
response_headers = cors_origin ? cors_headers : {}
# Setup a subscription with a client id. We haven't subscribed yet here.
- consumer = @broker.consumer(cid)
+ if queue = queues[queue_name]
+ queue.live
+ else
+ queue = queues[queue_name] = Firehose::Subscription::Queue.new(cid, path)
+ end
# Setup a timeout timer to tell clients that time out that everything is OK
# and they should come back for more
- timer = EventMachine::Timer.new(timeout) do
+ long_poll_timer = EM::Timer.new(timeout) do
# We send a 204 OK to tell the client to reconnect.
env['async.callback'].call [204, response_headers, []]
Firehose.logger.debug "HTTP wait `#{cid}@#{path}` timed out"
end
# Ok, now subscribe to the subscription.
- consumer.subscribe_to path do |message, subscription|
- timer.cancel # Turn off the heart beat so we don't execute any of that business.
- consumer.unsubscribe
+ queue.pop do |message, subscription|
+ long_poll_timer.cancel # Turn off the heart beat so we don't execute any of that business.
env['async.callback'].call [200, response_headers, [message]]
Firehose.logger.debug "HTTP sent `#{message}` to `#{cid}@#{path}`"
end
Firehose.logger.debug "HTTP subscribed to `#{cid}@#{path}`"
# Unsubscribe from the subscription if its still open and something bad happened
# or the heart beat triggered before we could finish.
env['async.close'].callback do
- consumer.unsubscribe if consumer
+ # Kill queue if we don't hear back in 30s
+ queue.kill timeout do
+ Firehose.logger.debug "Deleting queue to `#{queue_name}`"
+ queues.delete queue_name
+ end
Firehose.logger.debug "HTTP connection `#{cid}@#{path}` closing"
end
end
# Tell the web server that this will be an async response.
@@ -65,73 +70,69 @@
# PUT is how we throw messages on to the fan-out queue.
when 'PUT'
body = env['rack.input'].read
Firehose.logger.debug "HTTP published `#{body}` to `#{path}`"
- Firehose::Publisher.new.publish(path, body)
+ publisher.publish(path, body)
[202, {}, []]
else
Firehose.logger.debug "HTTP #{method} not supported"
[501, {'Content-Type' => 'text/plain'}, ["#{method} not supported."]]
end
end
+
+ private
+ def publisher
+ @publisher ||= Firehose::Publisher.new
+ end
+
+ def queues
+ @queues ||= {}
+ end
end
class WebSocket < ::Rack::WebSocket::Application
- attr_reader :cid, :path
+ attr_reader :cid, :path, :subscription
- def initialize(broker)
- @broker = broker
- end
-
# Subscribe to a path and make some magic happen, mmkmay?
def on_open(env)
req = ::Rack::Request.new(env)
@cid = req.params['cid']
@path = req.path
+ @subscription = Firehose::Subscription.new(cid, path)
- @consumer = @broker.consumer(@cid)
- @consumer.subscribe_to path do |message|
+ subscription.subscribe do |message, subscription|
Firehose.logger.debug "WS sent `#{message}` to `#{cid}@#{path}`"
send_data message
end
Firehose.logger.debug "WS subscribed to `#{cid}@#{path}`"
end
# Delete the subscription if the thing even happened.
def on_close(env)
- @consumer.unsubscribe if @consumer
+ subscription.unsubscribe
Firehose.logger.debug "WS connection `#{cid}@#{path}` closing"
end
# Log websocket level errors
def on_error(env, error)
- Firehose.logger.error "WS connection `#{cid}@#{path}` error `#{error}`: #{env.inspect}"
- @consumer.unsubscribe if @consumer
+ Firehose.logger.error "WS connection `#{cid}@#{path}` error `#{error}`: #{error.backtrace}"
end
end
class App
- # Firehose broker that will be used to pub/sub messages.
- attr_reader :broker
-
- # Fire up a default broker if one is not specified.
- def initialize(broker = Firehose::Broker.new)
- @broker = broker
- end
-
def call(env)
websocket_request?(env) ? websocket.call(env) : http_long_poll.call(env)
end
private
def websocket
- @websocket ||= WebSocket.new(broker)
+ WebSocket.new
end
def http_long_poll
- @http_long_poll ||= HttpLongPoll.new(broker)
+ @http_long_poll ||= HttpLongPoll.new
end
def websocket_request?(env)
env['HTTP_UPGRADE'] =~ /websocket/i
end
\ No newline at end of file