lib/vines/stream/http.rb in vines-0.1.0 vs lib/vines/stream/http.rb in vines-0.1.1

- old
+ new

@@ -1,111 +1,153 @@ # encoding: UTF-8 module Vines class Stream class Http < Client - include Thin - include Vines::Log + attr_accessor :session - attr_accessor :last_broadcast_presence, :last_activity - def initialize(config) - @config = config - @domain = nil - @requested_roster = false - @available = false - @unbound = false - @last_broadcast_presence = nil - @request = Thin::Request.new - @@http_states ||= HttpStates.new - @state = Auth.new(self) + super + @session = Http::Session.new(self) end - def user - @http_state.user + def post_init + super + router.delete(self) + @parser = ::Http::Parser.new.tap do |p| + body = '' + p.on_body = proc {|data| body << data } + p.on_message_complete = proc { + process_request(body) + body = '' + } + end end - def user=(user) - @http_state.user = user + # Return true if this session ID matches the stream's session ID. Clients + # are only allowed one session per stream so they must send the same + # session ID on each request. + def valid_session?(sid) + @session.id == sid end - def receive_data(data) - #TODO: make sure we add max stanza size enforcement - if @request.parse(data) - process_http_request(@request) - @request = Thin::Request.new + def max_stanza_size + @config[:http].max_stanza_size + end + + def max_resources_per_account + @config[:http].max_resources_per_account + end + + def process_request(body) + # proxy server ping + if body.empty? + req = Request.new(self, nil, 'text/plain') + req.reply('online') + close_connection_after_writing + else + body = Nokogiri::XML(body).root + req = Request.new(self, body['rid'], @session.content_type) + @session.request(req) + @nodes.push(body) end - rescue InvalidRequest => e - error(StreamErrors::NotWellFormed.new) end + # Alias the Stream#write method before overriding it so we can call + # it later from a Session instance. + alias :stream_write :write + + # Override Stream#write to queue stanzas rather than immediately writing + # to the stream. Stanza responses must be paired with a queued request. def write(data) - @http_state.write(data) + @session.write(data) end - def setup_new_client(rid, domain) - sid = Kit.uuid - log.info("Setting up a new client SID: #{sid} for RID: #{rid}.") - @http_state = HttpState.new(self, sid, rid, domain) - @@http_states[sid] = @http_state + # Return an array of Node objects inside the body element. + # TODO This parses the XML again just to strip namespaces. Figure out + # Nokogiri namespace handling instead. + def parse_body(body) + body.namespace = nil + body.elements.map do |node| + Nokogiri::XML(node.to_s.sub(' xmlns="jabber:client"', '')).root + end end - def unbind - #router.delete(@http_state) - log.info("HTTP Stream disconnected:\tfrom=#{@remote_addr}\tto=#{@local_addr}") - log.info("Streams connected: #{router.size}") + def start(node) + domain, type, hold, wait, rid = %w[to content hold wait rid].map {|a| (node[a] || '').strip } + version = node.attribute_with_ns('version', NAMESPACES[:bosh]).value rescue nil + + @session.inactivity = 20 + @session.domain = domain + @session.content_type = type unless type.empty? + @session.hold = hold.to_i unless hold.empty? + @session.wait = wait.to_i unless wait.empty? + + raise StreamErrors::UndefinedCondition.new('rid required') if rid.empty? + raise StreamErrors::UnsupportedVersion unless version == '1.0' + raise StreamErrors::HostUnknown unless @config.vhost?(domain) + raise StreamErrors::InvalidNamespace unless node.namespaces['xmlns'] == NAMESPACES[:http_bind] + + Sessions[@session.id] = @session + router << @session + send_stream_header end - def process_http_request(request) - if request.body.string.empty? - #Respond to proxy servers' status pings - log.info("A status request has been received.") - send_data("Online") - close_connection_after_writing - return - end - body = Nokogiri::XML(request.body.string).root - body.namespace = nil - #TODO: Confirm this is a valid body stanza. - # If it isn't a body, return proxy ping result + def terminate + doc = Nokogiri::XML::Document.new + node = doc.create_element('body', + 'type' => 'terminate', + 'xmlns' => NAMESPACES[:http_bind]) + @session.reply(node) + close_stream + end - if body['sid'] - @http_state = @@http_states[body['sid']] - unless @http_state - log.info("Client was not found #{body['sid']}") - send_bosh_error - return - end - @domain = @http_state.domain - @user = @http_state.user - @http_state.request(body['rid']) - if body['restart'] - @http_state.handle_restart - router << @http_state - @state = Bind.new(self) - end + private - body.elements.each do |node| - @nodes.push(Nokogiri::XML(node.to_s.sub(' xmlns="jabber:client"', '')).root) + def send_stream_header + doc = Nokogiri::XML::Document.new + node = doc.create_element('body', + 'charsets' => 'UTF-8', + 'from' => @session.domain, + 'hold' => @session.hold, + 'inactivity' => @session.inactivity, + 'polling' => '5', + 'requests' => '2', + 'sid' => @session.id, + 'ver' => '1.6', + 'wait' => @session.wait, + 'xmpp:version' => '1.0', + 'xmlns' => NAMESPACES[:http_bind], + 'xmlns:xmpp' => NAMESPACES[:bosh], + 'xmlns:stream' => NAMESPACES[:stream]) + + node << doc.create_element('stream:features') do |el| + el << doc.create_element('mechanisms') do |mechanisms| + mechanisms.default_namespace = NAMESPACES[:sasl] + mechanisms << doc.create_element('mechanism', 'PLAIN') end - else - self.setup_new_client(body['rid'], body['to']) end + @session.reply(node) end - def send_bosh_error - body = "<body type='terminate' condition='remote-connection-failed' xmlns='http://jabber.org/protocol/httpbind'/>" - header = [ - "HTTP/1.1 404 OK", - "Content-Type: text/xml; charset=utf-8", - "Content-Length: #{body.bytesize}" - ].join("\r\n") - - send_data([header, body].join("\r\n\r\n")) + # Override +Stream#send_stream_error+ to wrap the error XML in a BOSH + # terminate body tag. + def send_stream_error(e) + doc = Nokogiri::XML::Document.new + node = doc.create_element('body', + 'condition' => 'remote-stream-error', + 'type' => 'terminate', + 'xmlns' => NAMESPACES[:http_bind], + 'xmlns:stream' => NAMESPACES[:stream]) + node.inner_html = e.to_xml + @session.reply(node) end - def domain - @http_state.domain + # Override +Stream#close_stream+ to simply close the connection without + # writing a closing stream tag. + def close_stream + close_connection_after_writing + @session.close end end end end