lib/firehose/rack/consumer.rb in firehose-1.2.8 vs lib/firehose/rack/consumer.rb in firehose-1.2.9
- old
+ new
@@ -1,163 +1,39 @@
-require 'faye/websocket'
require 'json'
module Firehose
module Rack
# Handles a subscription request over HTTP or WebSockets depeding on its abilities and
# binds that to the Firehose::Server::Subscription class, which is bound to a channel that
# gets published to.
class Consumer
+ # Rack consumer transports
+ autoload :HttpLongPoll, 'firehose/rack/consumer/http_long_poll'
+ autoload :WebSocket, 'firehose/rack/consumer/web_socket'
+
+ # Let the client configure the consumer on initialization.
+ def initialize
+ yield self if block_given?
+ end
+
def call(env)
websocket_request?(env) ? websocket.call(env) : http_long_poll.call(env)
end
- private
+ # Memoized instance of web socket that can be configured from the rack app.
def websocket
- WebSocket.new
+ @web_socket ||= WebSocket.new
end
+ # Memoized instance of http long poll handler that can be configured from the rack app.
def http_long_poll
@http_long_poll ||= HttpLongPoll.new
end
+ private
+ # Determine if the incoming request is a websocket request.
def websocket_request?(env)
- Faye::WebSocket.websocket?(env)
+ Firehose::Rack::Consumer::WebSocket.request?(env)
end
-
- class HttpLongPoll
- include Firehose::Rack::Helpers
-
- # How long should we wait before closing out the consuming clients web connection
- # for long polling? Most browsers timeout after a connection has been idle for 30s.
- TIMEOUT = 20
-
- def call(env)
- req = env['parsed_request'] ||= ::Rack::Request.new(env)
- path = req.path
- method = req.request_method
- # Get the Last Message Sequence from the query string.
- # Ideally we'd use an HTTP header, but android devices don't let us
- # set any HTTP headers for CORS requests.
- last_sequence = req.params['last_message_sequence'].to_i
-
- case method
- # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response,
- # close down the requeust, and the client then reconnects.
- when 'GET'
- Firehose.logger.debug "HTTP GET with last_sequence #{last_sequence} for path #{path} with query #{env["QUERY_STRING"].inspect} and params #{req.params.inspect}"
- EM.next_tick do
-
- if last_sequence < 0
- env['async.callback'].call response(400, "The last_message_sequence parameter may not be less than zero", response_headers(env))
- else
- Server::Channel.new(path).next_message(last_sequence, :timeout => TIMEOUT).callback do |message, sequence|
- env['async.callback'].call response(200, wrap_frame(message, sequence), response_headers(env))
- end.errback do |e|
- if e == :timeout
- env['async.callback'].call response(204, '', response_headers(env))
- else
- Firehose.logger.error "Unexpected error when trying to GET last_sequence #{last_sequence} for path #{path}: #{e.inspect}"
- env['async.callback'].call response(500, 'Unexpected error', response_headers(env))
- end
- end
- end
-
- end
-
- # Tell the web server that this will be an async response.
- ASYNC_RESPONSE
-
- else
- Firehose.logger.debug "HTTP #{method} not supported"
- response(501, "#{method} not supported.")
- end
- end
-
-
- private
-
- def wrap_frame(message, last_sequence)
- JSON.generate :message => message, :last_sequence => last_sequence
- end
-
- # If the request is a CORS request, return those headers, otherwise don't worry 'bout it
- def response_headers(env)
- cors_origin(env) ? cors_headers(env) : {}
- end
-
- def cors_origin(env)
- env['HTTP_ORIGIN']
- end
-
- def cors_headers(env)
- # TODO seperate out CORS logic as an async middleware with a Goliath web server.
- {'Access-Control-Allow-Origin' => cors_origin(env)}
- end
- end
-
-
- # It _may_ be more memory efficient if we used the same instance of
- # this class (or if we even just used a lambda) for every connection.
- class WebSocket
- def call(env)
- req = ::Rack::Request.new env
- @ws = Faye::WebSocket.new env
- @path = req.path
- @ws.onopen = method :handle_open
- @ws.onclose = method :handle_close
- @ws.onerror = method :handle_error
- @ws.onmessage = method :handle_message
- return @ws.rack_response
- end
-
- private
- def subscribe(last_sequence)
- @subscribed = true
- @channel = Server::Channel.new @path
- @deferrable = @channel.next_message last_sequence
- @deferrable.callback do |message, sequence|
- Firehose.logger.debug "WS sent `#{message}` to `#{@path}` with sequence `#{sequence}`"
- @ws.send self.class.wrap_frame(message, last_sequence)
- subscribe sequence
- end
- @deferrable.errback do |e|
- EM.next_tick { raise e.inspect } unless e == :disconnect
- end
- end
-
- def handle_message(event)
- msg = JSON.parse(event.data, :symbolize_names => true) rescue {}
- seq = msg[:message_sequence]
- if msg[:ping] == 'PING'
- Firehose.logger.debug "WS ping received, sending pong"
- @ws.send JSON.generate :pong => 'PONG'
- elsif !@subscribed && seq.kind_of?(Integer)
- Firehose.logger.debug "Subscribing at message_sequence #{seq}"
- subscribe seq
- end
- end
-
- def handle_open(event)
- Firehose.logger.debug "WebSocket subscribed to `#{@path}`. Waiting for message_sequence..."
- end
-
- def handle_close(event)
- if @deferrable
- @deferrable.fail :disconnect
- @channel.unsubscribe(@deferrable) if @channel
- end
- Firehose.logger.debug "WS connection `#{@path}` closing. Code: #{event.code.inspect}; Reason #{event.reason.inspect}"
- end
-
- def handle_error(event)
- Firehose.logger.error "WS connection `#{@path}` error. Message: `#{event.message.inspect}`; Data: `#{event.data.inspect}`"
- end
-
- def self.wrap_frame(message, last_sequence)
- JSON.generate :message => message, :last_sequence => last_sequence
- end
- end
-
end
end
end