lib/blather/stream.rb in blather-0.3.4 vs lib/blather/stream.rb in blather-0.4.0
- old
+ new
@@ -1,258 +1,166 @@
module Blather
class Stream < EventMachine::Connection
+ class NoConnection < RuntimeError; end
+
+ STREAM_NS = 'http://etherx.jabber.org/streams'
+ attr_accessor :jid, :password
+
##
# Start the stream between client and server
# [client] must be an object that will respond to #call and #jid=
# [jid] must be a valid argument for JID.new (see JID)
# [pass] must be the password
# [host] (optional) must be the hostname or IP to connect to. defaults to the domain of [jid]
# [port] (optional) must be the port to connect to. defaults to 5222
def self.start(client, jid, pass, host = nil, port = 5222)
jid = JID.new jid
- host ||= jid.domain
+ if host
+ connect host, port, self, client, jid, pass
+ else
+ require 'resolv'
+ srv = []
+ Resolv::DNS.open { |dns| srv = dns.getresources("_xmpp-client._tcp.#{jid.domain}", Resolv::DNS::Resource::IN::SRV) }
+ if srv.empty?
+ connect jid.domain, port, self, client, jid, pass
+ else
+ srv.sort! { |a,b| (a.priority != b.priority) ? (a.priority <=> b.priority) : (b.weight <=> a.weight) }
+ conn = nil
+ srv.each { |r| break unless (conn = connect(r.target.to_s, r.port, self, client, jid, pass)) === false }
+ conn
+ end
+ end
+ end
- EM.connect host, port, self, client, jid, pass
+ ##
+ # Attempt a connection
+ # Stream will raise +NoConnection+ if it receives #unbind before #post_init
+ # this catches that and returns false prompting for another attempt
+ def self.connect(host, port, conn, client, jid, pass)
+ EM.connect host, port, conn, client, jid, pass
+ rescue NoConnection
+ false
end
+ [:started, :stopped, :ready, :negotiating].each do |state|
+ define_method("#{state}?") { @state == state }
+ end
+
##
# Send data over the wire
# The argument for this can be anything that
# responds to #to_s
def send(stanza)
#TODO Queue if not ready
- LOG.debug "SENDING: (#{caller[1]}) #{stanza}"
+ Blather.logger.debug "SENDING: (#{caller[1]}) #{stanza}"
send_data stanza.respond_to?(:to_xml) ? stanza.to_xml : stanza.to_s
end
##
- # True if the stream is in the stopped state
- def stopped?
- @state == :stopped
- end
-
- ##
- # True when the stream is in the negotiation phase.
- def negotiating?
- ![:stopped, :ready].include? @state
- end
-
- ##
- # True when the stream is ready
- # The stream is ready immediately after receiving <stream:stream>
- # and before any feature negotion. Once feature negoation starts
- # the stream will not be ready until all negotations have completed
- # successfully.
- def ready?
- @state == :ready
- end
-
- ##
# Called by EM.connect to initialize stream variables
def initialize(client, jid, pass) # :nodoc:
super()
@error = nil
- @client = client
+ @receiver = @client = client
self.jid = jid
- @pass = pass
-
- @to = @jid.domain
+ @to = self.jid.domain
+ @password = pass
end
##
# Called when EM completes the connection to the server
# this kicks off the starttls/authorize/bind process
def connection_completed # :nodoc:
-# @keepalive = EM::Timer.new(60) { send_data ' ' }
- @state = :stopped
- dispatch
+# @keepalive = EM::PeriodicTimer.new(60) { send_data ' ' }
+ start
end
##
# Called by EM with data from the wire
def receive_data(data) # :nodoc:
- LOG.debug "\n#{'-'*30}\n"
- LOG.debug "<< #{data}"
- @parser.receive_data data
+ Blather.logger.debug "\n#{'-'*30}\n"
+ Blather.logger.debug "<< #{data}"
+ @parser << data
- rescue ParseWarning => e
- @client.receive_data e
rescue ParseError => e
@error = e
- send "<stream:error><xml-not-well-formed xmlns='urn:ietf:params:xml:ns:xmpp-streams'/></stream:error>"
+ send "<stream:error><xml-not-well-formed xmlns='#{StreamError::STREAM_ERR_NS}'/></stream:error>"
stop
end
+ def post_init
+ @connected = true
+ end
+
##
# Called by EM when the connection is closed
def unbind # :nodoc:
+ raise NoConnection unless @connected
+
# @keepalive.cancel
@state = :stopped
@client.receive_data @error if @error
@client.unbind
end
##
# Called by the parser with parsed nodes
def receive(node) # :nodoc:
- LOG.debug "RECEIVING (#{node.element_name}) #{node}"
+ Blather.logger.debug "RECEIVING (#{node.element_name}) #{node}"
@node = node
- if @node.find_first('//stream:error', :stream => 'http://etherx.jabber.org/streams')
- handle_stream_error
- return
+ if @node.namespace && @node.namespace.prefix == 'stream'
+ case @node.element_name
+ when 'stream'
+ @state = :ready if @state == :stopped
+ return
+ when 'error'
+ handle_stream_error
+ return
+ when 'end'
+ stop
+ return
+ when 'features'
+ @state = :negotiating
+ @receiver = Features.new(
+ self,
+ proc { ready! },
+ proc { |err| @error = err; stop }
+ )
+ end
end
-
- case @node.element_name
- when 'stream'
- @state = :ready if @state == :stopped
-
- when 'stream:end'
- stop
-
- when 'features'
- @features = @node.children
- @state = :features
- dispatch
-
- else
- dispatch
-
- end
+ @receiver.receive_data @node.to_stanza
end
##
# Ensure the JID gets attached to the client
def jid=(new_jid) # :nodoc:
- LOG.debug "NEW JID: #{new_jid}"
+ Blather.logger.debug "NEW JID: #{new_jid}"
@jid = JID.new new_jid
@client.jid = @jid
end
protected
##
- # Dispatch based on current state
- def dispatch
- __send__ @state
- end
-
- ##
- # Start the stream
- # Each time the stream is started or re-started we need to kill off the old
- # parser so as not to confuse it
- def start
- end
-
- ##
# Stop the stream
def stop
unless @state == :stopped
@state = :stopped
send '</stream:stream>'
end
end
- ##
- # Called when @state == :stopped to start the stream
- # Counter intuitive, I know
- def stopped
- start
- end
-
- ##
- # Called when @state == :ready
- # Simply passes the stanza to the client
- def ready
- @client.receive_data @node.to_stanza
- end
-
def handle_stream_error
- @error = StreamError.import @node
+ @error = StreamError.import(@node)
stop
- @state = :error
end
- ##
- # Called when @state == :features
- # Runs through the list of features starting each one in turn
- def features
- feature = @features.first
- LOG.debug "FEATURE: #{feature}"
- @state = case feature ? feature.namespaces.default.href : nil
- when 'urn:ietf:params:xml:ns:xmpp-tls' then :establish_tls
- when 'urn:ietf:params:xml:ns:xmpp-sasl' then :authenticate_sasl
- when 'urn:ietf:params:xml:ns:xmpp-bind' then :bind_resource
- when 'urn:ietf:params:xml:ns:xmpp-session' then :establish_session
- else :ready
- end
-
- # Dispatch to the individual feature methods unless
- # feature negotiation is complete
- dispatch unless ready?
+ def ready!
+ @state = :started
+ @receiver = @client
+ @client.post_init
end
-
- ##
- # Start TLS
- def establish_tls
- unless @tls
- @tls = TLS.new self
- # on success destroy the TLS object and restart the stream
- @tls.on_success { LOG.debug "TLS: SUCCESS"; @tls = nil; start }
- # on failure stop the stream
- @tls.on_failure { |err| LOG.debug "TLS: FAILURE"; @error = err; stop }
-
- @node = @features.shift
- end
- @tls.handle @node
- end
-
- ##
- # Authenticate via SASL
- def authenticate_sasl
- unless @sasl
- @sasl = SASL.new(self, @jid, @pass)
- # on success destroy the SASL object and restart the stream
- @sasl.on_success { LOG.debug "SASL SUCCESS"; @sasl = nil; start }
- # on failure set the error and stop the stream
- @sasl.on_failure { |err| LOG.debug "SASL FAIL"; @error = err; stop }
-
- @node = @features.shift
- end
- @sasl.handle @node
- end
-
- ##
- # Bind to the resource provided by either the client or the server
- def bind_resource
- unless @resource
- @resource = Resource.new self, @jid
- # on success destroy the Resource object, set the jid, continue along the features dispatch process
- @resource.on_success { |jid| LOG.debug "RESOURCE: SUCCESS"; @resource = nil; self.jid = jid; @state = :features; dispatch }
- # on failure end the stream
- @resource.on_failure { |err| LOG.debug "RESOURCE: FAILURE"; @error = err; stop }
-
- @node = @features.shift
- end
- @resource.handle @node
- end
-
- ##
- # Establish the session between client and server
- def establish_session
- unless @session
- @session = Session.new self, @to
- # on success destroy the session object, let the client know the stream has been started
- # then continue the features dispatch process
- @session.on_success { LOG.debug "SESSION: SUCCESS"; @session = nil; @client.post_init; @state = :features; dispatch }
- # on failure end the stream
- @session.on_failure { |err| LOG.debug "SESSION: FAILURE"; @error = err; stop }
-
- @node = @features.shift
- end
- @session.handle @node
- end
end
-
end