lib/blather/stream.rb in blather-0.2.1 vs lib/blather/stream.rb in blather-0.2.2

- old
+ new

@@ -1,181 +1,249 @@ module Blather - module Stream - - # Connect to the server + class Stream < EventMachine::Connection + ## + # Start the stream between client and server + # [client] must be an object that will respond to #call and #jid= + # [jid] must be a valid argument for JID.new (see JID) + # [pass] must be the password + # [host] (optional) must be the hostname or IP to connect to. defaults to the domain of [jid] + # [port] (optional) must be the port to connect to. defaults to 5222 def self.start(client, jid, pass, host = nil, port = 5222) jid = JID.new jid host ||= jid.domain EM.connect host, port, self, client, jid, pass end + ## + # Send data over the wire + # The argument for this can be anything that + # responds to #to_s + def send(stanza) + #TODO Queue if not ready + LOG.debug "SENDING: (#{caller[1]}) #{stanza}" + send_data stanza.respond_to?(:to_xml) ? stanza.to_xml : stanza.to_s + end + + ## + # True if the stream is in the stopped state + def stopped? + @state == :stopped + end + + ## + # True when the stream is in the negotiation phase. + def negotiating? + ![:stopped, :ready].include? @state + end + + ## + # True when the stream is ready + # The stream is ready immediately after receiving <stream:stream> + # and before any feature negotion. Once feature negoation starts + # the stream will not be ready until all negotations have completed + # successfully. + def ready? + @state == :ready + end + + ## + # Called by EM.connect to initialize stream variables def initialize(client, jid, pass) # :nodoc: super() + @error = nil @client = client self.jid = jid @pass = pass @to = @jid.domain - @id = nil - @lang = 'en' - @version = '1.0' - @namespace = 'jabber:client' - - @parser = Parser.new self end + ## + # Called when EM completes the connection to the server + # this kicks off the starttls/authorize/bind process def connection_completed # :nodoc: # @keepalive = EM::Timer.new(60) { send_data ' ' } @state = :stopped dispatch end + ## + # Called by EM with data from the wire def receive_data(data) # :nodoc: - @parser.parse data + LOG.debug "\n#{'-'*30}\n" + LOG.debug "<< #{data}" + @parser.receive_data data - rescue => e - @client.respond_to?(:rescue) ? @client.rescue(e) : raise(e) + rescue ParseError => e + @error = e + stop end + ## + # Called by EM when the connection is closed def unbind # :nodoc: # @keepalive.cancel @state = :stopped + @client.call @error if @error + @client.stopped end + ## + # Called by the parser with parsed nodes def receive(node) # :nodoc: - LOG.debug "\n"+('-'*30)+"\n" LOG.debug "RECEIVING (#{node.element_name}) #{node}" @node = node case @node.element_name when 'stream:stream' @state = :ready if @state == :stopped when 'stream:end' - @state = :stopped + stop when 'stream:features' @features = @node.children @state = :features dispatch when 'stream:error' - raise StreamError.new(@node) + @error = StreamError.import @node + stop + @state = :error else dispatch end end ## - # Send data over the wire - def send(stanza) - #TODO Queue if not ready - LOG.debug "SENDING: (#{caller[1]}) #{stanza}" - send_data stanza.to_s - end - - def stopped? - @state == :stopped - end - - def ready? - @state == :ready - end - + # Ensure the JID gets attached to the client def jid=(new_jid) # :nodoc: LOG.debug "NEW JID: #{new_jid}" - new_jid = JID.new new_jid - @client.jid = new_jid - @jid = new_jid + @jid = JID.new new_jid + @client.jid = @jid end - private + protected + ## + # Dispatch based on current state def dispatch __send__ @state end + ## + # Start the stream + # Each time the stream is started or re-started we need to kill off the old + # parser so as not to confuse it def start - send <<-STREAM - <stream:stream - to='#{@to}' - xmlns='#{@namespace}' - xmlns:stream='http://etherx.jabber.org/streams' - version='#{@version}' - xml:lang='#{@lang}' - > - STREAM end + ## + # Stop the stream def stop - send '</stream:stream>' + unless @state == :stopped + @state = :stopped + send '</stream:stream>' + end end + ## + # Called when @state == :stopped to start the stream + # Counter intuitive, I know def stopped start end + ## + # Called when @state == :ready + # Simply passes the stanza to the client def ready @client.call @node.to_stanza end + ## + # Called when @state == :features + # Runs through the list of features starting each one in turn def features feature = @features.first LOG.debug "FEATURE: #{feature}" - @state = case feature ? feature['xmlns'] : nil + @state = case feature ? feature.namespaces.default.href : nil when 'urn:ietf:params:xml:ns:xmpp-tls' then :establish_tls when 'urn:ietf:params:xml:ns:xmpp-sasl' then :authenticate_sasl when 'urn:ietf:params:xml:ns:xmpp-bind' then :bind_resource when 'urn:ietf:params:xml:ns:xmpp-session' then :establish_session else :ready end + # Dispatch to the individual feature methods unless + # feature negotiation is complete dispatch unless ready? end + ## + # Start TLS def establish_tls unless @tls @tls = TLS.new self - @tls.success { LOG.debug "TLS: SUCCESS"; @tls = nil; start } - @tls.failure { LOG.debug "TLS: FAILURE"; stop } + # on success destroy the TLS object and restart the stream + @tls.on_success { LOG.debug "TLS: SUCCESS"; @tls = nil; start } + # on failure stop the stream + @tls.on_failure { |err| LOG.debug "TLS: FAILURE"; @error = err; stop } + @node = @features.shift end - @tls.receive @node + @tls.handle @node end + ## + # Authenticate via SASL def authenticate_sasl unless @sasl @sasl = SASL.new(self, @jid, @pass) - @sasl.success { LOG.debug "SASL SUCCESS"; @sasl = nil; start } - @sasl.failure { LOG.debug "SASL FAIL"; stop } + # on success destroy the SASL object and restart the stream + @sasl.on_success { LOG.debug "SASL SUCCESS"; @sasl = nil; start } + # on failure set the error and stop the stream + @sasl.on_failure { |err| LOG.debug "SASL FAIL"; @error = err; stop } + @node = @features.shift end - @sasl.receive @node + @sasl.handle @node end + ## + # Bind to the resource provided by either the client or the server def bind_resource unless @resource @resource = Resource.new self, @jid - @resource.success { |jid| LOG.debug "RESOURCE: SUCCESS"; @resource = nil; self.jid = jid; @state = :features; dispatch } - @resource.failure { LOG.debug "RESOURCE: FAILURE"; stop } + # on success destroy the Resource object, set the jid, continue along the features dispatch process + @resource.on_success { |jid| LOG.debug "RESOURCE: SUCCESS"; @resource = nil; self.jid = jid; @state = :features; dispatch } + # on failure end the stream + @resource.on_failure { |err| LOG.debug "RESOURCE: FAILURE"; @error = err; stop } + @node = @features.shift end - @resource.receive @node + @resource.handle @node end + ## + # Establish the session between client and server def establish_session unless @session @session = Session.new self, @to - @session.success { LOG.debug "SESSION: SUCCESS"; @session = nil; @client.stream_started(self); @state = :features; dispatch } - @session.failure { LOG.debug "SESSION: FAILURE"; stop } + # on success destroy the session object, let the client know the stream has been started + # then continue the features dispatch process + @session.on_success { LOG.debug "SESSION: SUCCESS"; @session = nil; @client.stream_started(self); @state = :features; dispatch } + # on failure end the stream + @session.on_failure { |err| LOG.debug "SESSION: FAILURE"; @error = err; stop } + @node = @features.shift end - @session.receive @node + @session.handle @node end end end