lib/blather/stream.rb in blather-0.2.1 vs lib/blather/stream.rb in blather-0.2.2
- old
+ new
@@ -1,181 +1,249 @@
module Blather
- module Stream
-
- # Connect to the server
+ class Stream < EventMachine::Connection
+ ##
+ # Start the stream between client and server
+ # [client] must be an object that will respond to #call and #jid=
+ # [jid] must be a valid argument for JID.new (see JID)
+ # [pass] must be the password
+ # [host] (optional) must be the hostname or IP to connect to. defaults to the domain of [jid]
+ # [port] (optional) must be the port to connect to. defaults to 5222
def self.start(client, jid, pass, host = nil, port = 5222)
jid = JID.new jid
host ||= jid.domain
EM.connect host, port, self, client, jid, pass
end
+ ##
+ # Send data over the wire
+ # The argument for this can be anything that
+ # responds to #to_s
+ def send(stanza)
+ #TODO Queue if not ready
+ LOG.debug "SENDING: (#{caller[1]}) #{stanza}"
+ send_data stanza.respond_to?(:to_xml) ? stanza.to_xml : stanza.to_s
+ end
+
+ ##
+ # True if the stream is in the stopped state
+ def stopped?
+ @state == :stopped
+ end
+
+ ##
+ # True when the stream is in the negotiation phase.
+ def negotiating?
+ ![:stopped, :ready].include? @state
+ end
+
+ ##
+ # True when the stream is ready
+ # The stream is ready immediately after receiving <stream:stream>
+ # and before any feature negotion. Once feature negoation starts
+ # the stream will not be ready until all negotations have completed
+ # successfully.
+ def ready?
+ @state == :ready
+ end
+
+ ##
+ # Called by EM.connect to initialize stream variables
def initialize(client, jid, pass) # :nodoc:
super()
+ @error = nil
@client = client
self.jid = jid
@pass = pass
@to = @jid.domain
- @id = nil
- @lang = 'en'
- @version = '1.0'
- @namespace = 'jabber:client'
-
- @parser = Parser.new self
end
+ ##
+ # Called when EM completes the connection to the server
+ # this kicks off the starttls/authorize/bind process
def connection_completed # :nodoc:
# @keepalive = EM::Timer.new(60) { send_data ' ' }
@state = :stopped
dispatch
end
+ ##
+ # Called by EM with data from the wire
def receive_data(data) # :nodoc:
- @parser.parse data
+ LOG.debug "\n#{'-'*30}\n"
+ LOG.debug "<< #{data}"
+ @parser.receive_data data
- rescue => e
- @client.respond_to?(:rescue) ? @client.rescue(e) : raise(e)
+ rescue ParseError => e
+ @error = e
+ stop
end
+ ##
+ # Called by EM when the connection is closed
def unbind # :nodoc:
# @keepalive.cancel
@state = :stopped
+ @client.call @error if @error
+ @client.stopped
end
+ ##
+ # Called by the parser with parsed nodes
def receive(node) # :nodoc:
- LOG.debug "\n"+('-'*30)+"\n"
LOG.debug "RECEIVING (#{node.element_name}) #{node}"
@node = node
case @node.element_name
when 'stream:stream'
@state = :ready if @state == :stopped
when 'stream:end'
- @state = :stopped
+ stop
when 'stream:features'
@features = @node.children
@state = :features
dispatch
when 'stream:error'
- raise StreamError.new(@node)
+ @error = StreamError.import @node
+ stop
+ @state = :error
else
dispatch
end
end
##
- # Send data over the wire
- def send(stanza)
- #TODO Queue if not ready
- LOG.debug "SENDING: (#{caller[1]}) #{stanza}"
- send_data stanza.to_s
- end
-
- def stopped?
- @state == :stopped
- end
-
- def ready?
- @state == :ready
- end
-
+ # Ensure the JID gets attached to the client
def jid=(new_jid) # :nodoc:
LOG.debug "NEW JID: #{new_jid}"
- new_jid = JID.new new_jid
- @client.jid = new_jid
- @jid = new_jid
+ @jid = JID.new new_jid
+ @client.jid = @jid
end
- private
+ protected
+ ##
+ # Dispatch based on current state
def dispatch
__send__ @state
end
+ ##
+ # Start the stream
+ # Each time the stream is started or re-started we need to kill off the old
+ # parser so as not to confuse it
def start
- send <<-STREAM
- <stream:stream
- to='#{@to}'
- xmlns='#{@namespace}'
- xmlns:stream='http://etherx.jabber.org/streams'
- version='#{@version}'
- xml:lang='#{@lang}'
- >
- STREAM
end
+ ##
+ # Stop the stream
def stop
- send '</stream:stream>'
+ unless @state == :stopped
+ @state = :stopped
+ send '</stream:stream>'
+ end
end
+ ##
+ # Called when @state == :stopped to start the stream
+ # Counter intuitive, I know
def stopped
start
end
+ ##
+ # Called when @state == :ready
+ # Simply passes the stanza to the client
def ready
@client.call @node.to_stanza
end
+ ##
+ # Called when @state == :features
+ # Runs through the list of features starting each one in turn
def features
feature = @features.first
LOG.debug "FEATURE: #{feature}"
- @state = case feature ? feature['xmlns'] : nil
+ @state = case feature ? feature.namespaces.default.href : nil
when 'urn:ietf:params:xml:ns:xmpp-tls' then :establish_tls
when 'urn:ietf:params:xml:ns:xmpp-sasl' then :authenticate_sasl
when 'urn:ietf:params:xml:ns:xmpp-bind' then :bind_resource
when 'urn:ietf:params:xml:ns:xmpp-session' then :establish_session
else :ready
end
+ # Dispatch to the individual feature methods unless
+ # feature negotiation is complete
dispatch unless ready?
end
+ ##
+ # Start TLS
def establish_tls
unless @tls
@tls = TLS.new self
- @tls.success { LOG.debug "TLS: SUCCESS"; @tls = nil; start }
- @tls.failure { LOG.debug "TLS: FAILURE"; stop }
+ # on success destroy the TLS object and restart the stream
+ @tls.on_success { LOG.debug "TLS: SUCCESS"; @tls = nil; start }
+ # on failure stop the stream
+ @tls.on_failure { |err| LOG.debug "TLS: FAILURE"; @error = err; stop }
+
@node = @features.shift
end
- @tls.receive @node
+ @tls.handle @node
end
+ ##
+ # Authenticate via SASL
def authenticate_sasl
unless @sasl
@sasl = SASL.new(self, @jid, @pass)
- @sasl.success { LOG.debug "SASL SUCCESS"; @sasl = nil; start }
- @sasl.failure { LOG.debug "SASL FAIL"; stop }
+ # on success destroy the SASL object and restart the stream
+ @sasl.on_success { LOG.debug "SASL SUCCESS"; @sasl = nil; start }
+ # on failure set the error and stop the stream
+ @sasl.on_failure { |err| LOG.debug "SASL FAIL"; @error = err; stop }
+
@node = @features.shift
end
- @sasl.receive @node
+ @sasl.handle @node
end
+ ##
+ # Bind to the resource provided by either the client or the server
def bind_resource
unless @resource
@resource = Resource.new self, @jid
- @resource.success { |jid| LOG.debug "RESOURCE: SUCCESS"; @resource = nil; self.jid = jid; @state = :features; dispatch }
- @resource.failure { LOG.debug "RESOURCE: FAILURE"; stop }
+ # on success destroy the Resource object, set the jid, continue along the features dispatch process
+ @resource.on_success { |jid| LOG.debug "RESOURCE: SUCCESS"; @resource = nil; self.jid = jid; @state = :features; dispatch }
+ # on failure end the stream
+ @resource.on_failure { |err| LOG.debug "RESOURCE: FAILURE"; @error = err; stop }
+
@node = @features.shift
end
- @resource.receive @node
+ @resource.handle @node
end
+ ##
+ # Establish the session between client and server
def establish_session
unless @session
@session = Session.new self, @to
- @session.success { LOG.debug "SESSION: SUCCESS"; @session = nil; @client.stream_started(self); @state = :features; dispatch }
- @session.failure { LOG.debug "SESSION: FAILURE"; stop }
+ # on success destroy the session object, let the client know the stream has been started
+ # then continue the features dispatch process
+ @session.on_success { LOG.debug "SESSION: SUCCESS"; @session = nil; @client.stream_started(self); @state = :features; dispatch }
+ # on failure end the stream
+ @session.on_failure { |err| LOG.debug "SESSION: FAILURE"; @error = err; stop }
+
@node = @features.shift
end
- @session.receive @node
+ @session.handle @node
end
end
end