lib/vines/stream/http.rb in vines-0.1.0 vs lib/vines/stream/http.rb in vines-0.1.1
- old
+ new
@@ -1,111 +1,153 @@
# encoding: UTF-8
module Vines
class Stream
class Http < Client
- include Thin
- include Vines::Log
+ attr_accessor :session
- attr_accessor :last_broadcast_presence, :last_activity
-
def initialize(config)
- @config = config
- @domain = nil
- @requested_roster = false
- @available = false
- @unbound = false
- @last_broadcast_presence = nil
- @request = Thin::Request.new
- @@http_states ||= HttpStates.new
- @state = Auth.new(self)
+ super
+ @session = Http::Session.new(self)
end
- def user
- @http_state.user
+ def post_init
+ super
+ router.delete(self)
+ @parser = ::Http::Parser.new.tap do |p|
+ body = ''
+ p.on_body = proc {|data| body << data }
+ p.on_message_complete = proc {
+ process_request(body)
+ body = ''
+ }
+ end
end
- def user=(user)
- @http_state.user = user
+ # Return true if this session ID matches the stream's session ID. Clients
+ # are only allowed one session per stream so they must send the same
+ # session ID on each request.
+ def valid_session?(sid)
+ @session.id == sid
end
- def receive_data(data)
- #TODO: make sure we add max stanza size enforcement
- if @request.parse(data)
- process_http_request(@request)
- @request = Thin::Request.new
+ def max_stanza_size
+ @config[:http].max_stanza_size
+ end
+
+ def max_resources_per_account
+ @config[:http].max_resources_per_account
+ end
+
+ def process_request(body)
+ # proxy server ping
+ if body.empty?
+ req = Request.new(self, nil, 'text/plain')
+ req.reply('online')
+ close_connection_after_writing
+ else
+ body = Nokogiri::XML(body).root
+ req = Request.new(self, body['rid'], @session.content_type)
+ @session.request(req)
+ @nodes.push(body)
end
- rescue InvalidRequest => e
- error(StreamErrors::NotWellFormed.new)
end
+ # Alias the Stream#write method before overriding it so we can call
+ # it later from a Session instance.
+ alias :stream_write :write
+
+ # Override Stream#write to queue stanzas rather than immediately writing
+ # to the stream. Stanza responses must be paired with a queued request.
def write(data)
- @http_state.write(data)
+ @session.write(data)
end
- def setup_new_client(rid, domain)
- sid = Kit.uuid
- log.info("Setting up a new client SID: #{sid} for RID: #{rid}.")
- @http_state = HttpState.new(self, sid, rid, domain)
- @@http_states[sid] = @http_state
+ # Return an array of Node objects inside the body element.
+ # TODO This parses the XML again just to strip namespaces. Figure out
+ # Nokogiri namespace handling instead.
+ def parse_body(body)
+ body.namespace = nil
+ body.elements.map do |node|
+ Nokogiri::XML(node.to_s.sub(' xmlns="jabber:client"', '')).root
+ end
end
- def unbind
- #router.delete(@http_state)
- log.info("HTTP Stream disconnected:\tfrom=#{@remote_addr}\tto=#{@local_addr}")
- log.info("Streams connected: #{router.size}")
+ def start(node)
+ domain, type, hold, wait, rid = %w[to content hold wait rid].map {|a| (node[a] || '').strip }
+ version = node.attribute_with_ns('version', NAMESPACES[:bosh]).value rescue nil
+
+ @session.inactivity = 20
+ @session.domain = domain
+ @session.content_type = type unless type.empty?
+ @session.hold = hold.to_i unless hold.empty?
+ @session.wait = wait.to_i unless wait.empty?
+
+ raise StreamErrors::UndefinedCondition.new('rid required') if rid.empty?
+ raise StreamErrors::UnsupportedVersion unless version == '1.0'
+ raise StreamErrors::HostUnknown unless @config.vhost?(domain)
+ raise StreamErrors::InvalidNamespace unless node.namespaces['xmlns'] == NAMESPACES[:http_bind]
+
+ Sessions[@session.id] = @session
+ router << @session
+ send_stream_header
end
- def process_http_request(request)
- if request.body.string.empty?
- #Respond to proxy servers' status pings
- log.info("A status request has been received.")
- send_data("Online")
- close_connection_after_writing
- return
- end
- body = Nokogiri::XML(request.body.string).root
- body.namespace = nil
- #TODO: Confirm this is a valid body stanza.
- # If it isn't a body, return proxy ping result
+ def terminate
+ doc = Nokogiri::XML::Document.new
+ node = doc.create_element('body',
+ 'type' => 'terminate',
+ 'xmlns' => NAMESPACES[:http_bind])
+ @session.reply(node)
+ close_stream
+ end
- if body['sid']
- @http_state = @@http_states[body['sid']]
- unless @http_state
- log.info("Client was not found #{body['sid']}")
- send_bosh_error
- return
- end
- @domain = @http_state.domain
- @user = @http_state.user
- @http_state.request(body['rid'])
- if body['restart']
- @http_state.handle_restart
- router << @http_state
- @state = Bind.new(self)
- end
+ private
- body.elements.each do |node|
- @nodes.push(Nokogiri::XML(node.to_s.sub(' xmlns="jabber:client"', '')).root)
+ def send_stream_header
+ doc = Nokogiri::XML::Document.new
+ node = doc.create_element('body',
+ 'charsets' => 'UTF-8',
+ 'from' => @session.domain,
+ 'hold' => @session.hold,
+ 'inactivity' => @session.inactivity,
+ 'polling' => '5',
+ 'requests' => '2',
+ 'sid' => @session.id,
+ 'ver' => '1.6',
+ 'wait' => @session.wait,
+ 'xmpp:version' => '1.0',
+ 'xmlns' => NAMESPACES[:http_bind],
+ 'xmlns:xmpp' => NAMESPACES[:bosh],
+ 'xmlns:stream' => NAMESPACES[:stream])
+
+ node << doc.create_element('stream:features') do |el|
+ el << doc.create_element('mechanisms') do |mechanisms|
+ mechanisms.default_namespace = NAMESPACES[:sasl]
+ mechanisms << doc.create_element('mechanism', 'PLAIN')
end
- else
- self.setup_new_client(body['rid'], body['to'])
end
+ @session.reply(node)
end
- def send_bosh_error
- body = "<body type='terminate' condition='remote-connection-failed' xmlns='http://jabber.org/protocol/httpbind'/>"
- header = [
- "HTTP/1.1 404 OK",
- "Content-Type: text/xml; charset=utf-8",
- "Content-Length: #{body.bytesize}"
- ].join("\r\n")
-
- send_data([header, body].join("\r\n\r\n"))
+ # Override +Stream#send_stream_error+ to wrap the error XML in a BOSH
+ # terminate body tag.
+ def send_stream_error(e)
+ doc = Nokogiri::XML::Document.new
+ node = doc.create_element('body',
+ 'condition' => 'remote-stream-error',
+ 'type' => 'terminate',
+ 'xmlns' => NAMESPACES[:http_bind],
+ 'xmlns:stream' => NAMESPACES[:stream])
+ node.inner_html = e.to_xml
+ @session.reply(node)
end
- def domain
- @http_state.domain
+ # Override +Stream#close_stream+ to simply close the connection without
+ # writing a closing stream tag.
+ def close_stream
+ close_connection_after_writing
+ @session.close
end
end
end
end