# =XMPP4R - XMPP Library for Ruby # License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option. # Website::http://home.gna.org/xmpp4r/ require 'xmpp4r/callbacks' require 'socket' require 'thread' require 'xmpp4r/semaphore' require 'xmpp4r/streamparser' require 'xmpp4r/presence' require 'xmpp4r/message' require 'xmpp4r/iq' require 'xmpp4r/debuglog' require 'xmpp4r/idgenerator' module Jabber ## # The stream class manages a connection stream (a file descriptor using which # XML messages are read and sent) # # You may register callbacks for the three Jabber stanzas # (message, presence and iq) and use the send and send_with_id # methods. # # To ensure the order of received stanzas, callback blocks are # launched in the parser thread. If further blocking operations # are intended in those callbacks, run your own thread there. class Stream DISCONNECTED = 1 CONNECTED = 2 # file descriptor used attr_reader :fd # connection status attr_reader :status # number of stanzas currently being processed attr_reader :processing ## # Initialize a new stream def initialize @fd = nil @status = DISCONNECTED @xmlcbs = CallbackList.new @stanzacbs = CallbackList.new @messagecbs = CallbackList.new @iqcbs = CallbackList.new @presencecbs = CallbackList.new @send_lock = Mutex.new @last_send = Time.now @exception_block = nil @tbcbmutex = Mutex.new @threadblocks = [] @wakeup_thread = nil @streamid = nil @streamns = 'jabber:client' @features_sem = Semaphore.new @parser_thread = nil @processing = 0 end ## # Start the XML parser on the fd def start(fd) @stream_mechanisms = [] @stream_features = {} @fd = fd @parser = StreamParser.new(@fd, self) @parser_thread = Thread.new do Thread.current.abort_on_exception = true begin @parser.parse Jabber::debuglog("DISCONNECTED\n") if @exception_block Thread.new { close!; @exception_block.call(nil, self, :disconnected) } else close! end rescue Exception => e Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new do Thread.current.abort_on_exception = true close @exception_block.call(e, self, :start) end else Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}" close! raise end end end @status = CONNECTED end def stop @parser_thread.kill @parser = nil end ## # Mounts a block to handle exceptions if they occur during the # poll send. This will likely be the first indication that # the socket dropped in a Jabber Session. # # The block has to take three arguments: # * the Exception # * the Jabber::Stream object (self) # * a symbol where it happened, namely :start, :parser, :sending and :end def on_exception(&block) @exception_block = block end ## # This method is called by the parser when a failure occurs def parse_failure(e) Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") # A new thread has to be created because close will cause the thread # to commit suicide(???) if @exception_block # New thread, because close will kill the current thread Thread.new do Thread.current.abort_on_exception = true close @exception_block.call(e, self, :parser) end else Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " + "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}" close raise end end ## # This method is called by the parser upon receiving def parser_end if @exception_block Thread.new do Thread.current.abort_on_exception = true close @exception_block.call(nil, self, :close) end else close end end ## # Returns if this connection is connected to a Jabber service # return:: [Boolean] Connection status def is_connected? return @status == CONNECTED end ## # Returns if this connection is NOT connected to a Jabber service # # return:: [Boolean] Connection status def is_disconnected? return @status == DISCONNECTED end ## # Processes a received REXML::Element and executes # registered thread blocks and filters against it. # # element:: [REXML::Element] The received element def receive(element) @tbcbmutex.synchronize { @processing += 1 } Jabber::debuglog("RECEIVED:\n#{element.to_s}") if element.namespace('').to_s == '' # REXML namespaces are always strings element.add_namespace(@streamns) end case element.prefix when 'stream' case element.name when 'stream' stanza = element @streamid = element.attributes['id'] @streamns = element.namespace('') if element.namespace('') # Hack: component streams are basically client streams. # Someday we may want to create special stanza classes # for components/s2s deriving from normal stanzas but # posessing these namespaces @streamns = 'jabber:client' if @streamns == 'jabber:component:accept' unless element.attributes['version'] # isn't XMPP compliant, so Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") @features_sem.run # don't wait for end when 'features' stanza = element element.each { |e| if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' e.each_element('mechanism') { |mech| @stream_mechanisms.push(mech.text) } else @stream_features[e.name] = e.namespace end } Jabber::debuglog("FEATURES: received") @features_sem.run else stanza = element end else # Any stanza, classes are registered by XMPPElement::name_xmlns begin stanza = XMPPStanza::import(element) rescue NoNameXmlnsRegistered stanza = element end end if @xmlcbs.process(stanza) @tbcbmutex.synchronize { @processing -= 1 } return true end # Iterate through blocked threads (= waiting for an answer) # # We're dup'ping the @threadblocks here, so that we won't end up in an # endless loop if Stream#send is being nested. That means, the nested # threadblock won't receive the stanza currently processed, but the next # one. threadblocks = nil @tbcbmutex.synchronize do threadblocks = @threadblocks.dup end threadblocks.each { |threadblock| exception = nil r = false begin r = threadblock.call(stanza) rescue Exception => e exception = e end if r == true @tbcbmutex.synchronize do @threadblocks.delete(threadblock) end threadblock.wakeup @tbcbmutex.synchronize { @processing -= 1 } return true elsif exception @tbcbmutex.synchronize do @threadblocks.delete(threadblock) end threadblock.raise(exception) end } Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})") Jabber::debuglog("TRYING stanzacbs...") if @stanzacbs.process(stanza) @tbcbmutex.synchronize { @processing -= 1 } return true end r = false Jabber::debuglog("TRYING message/iq/presence/cbs...") case stanza when Message r = @messagecbs.process(stanza) when Iq r = @iqcbs.process(stanza) when Presence r = @presencecbs.process(stanza) end @tbcbmutex.synchronize { @processing -= 1 } return r end ## # Get the list of iq callbacks. def iq_callbacks @iqcbs end ## # Get the list of message callbacks. def message_callbacks @messagecbs end ## # Get the list of presence callbacks. def presence_callbacks @presencecbs end ## # Get the list of stanza callbacks. def stanza_callbacks @stanzacbs end ## # Get the list of xml callbacks. def xml_callbacks @xmlcbs end ## # This is used by Jabber::Stream internally to # keep track of any blocks which were passed to # Stream#send. class ThreadBlock def initialize(block) @block = block @waiter = Semaphore.new @exception = nil end def call(*args) @block.call(*args) end def wait @waiter.wait raise @exception if @exception end def wakeup @waiter.run end def raise(exception) @exception = exception @waiter.run end end def send_data(data) @send_lock.synchronize do @last_send = Time.now @fd << data @fd.flush end end ## # Sends XML data to the socket and (optionally) waits # to process received data. # # Do not invoke this in a callback but in a seperate thread # because we may not suspend the parser-thread (in whose # context callbacks are executed). # # xml:: [String] The xml data to send # &block:: [Block] The optional block def send(xml, &block) Jabber::debuglog("SENDING:\n#{xml}") if block threadblock = ThreadBlock.new(block) @tbcbmutex.synchronize do @threadblocks.unshift(threadblock) end end begin # Temporarily remove stanza's namespace to # reduce bandwidth consumption if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' and xml.prefix != 'stream' and xml.name != 'stream' xml.delete_namespace send_data(xml.to_s) xml.add_namespace(@streamns) else send_data(xml.to_s) end rescue Exception => e Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") if @exception_block Thread.new do Thread.current.abort_on_exception = true close! @exception_block.call(e, self, :sending) end else Jabber::warnlog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}" close! raise end end # The parser thread might be running this (think of a callback running send()) # If this is the case, we mustn't stop (or we would cause a deadlock) if block and Thread.current != @parser_thread threadblock.wait elsif block Jabber::warnlog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!") end end ## # Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be # generated by Jabber::IdGenerator if not already set. # # The block will be called once: when receiving a stanza with the # same Jabber::XMPPStanza#id. There is no need to return true to # complete this! Instead the return value of the block will be # returned. This is a direct result of unique request/response # stanza identification via the id attribute. # # The block may be omitted. Then, the result will be the response # stanza. # # Be aware that if a stanza with type='error' is received # the function does not yield but raises an ServerError with # the corresponding error element. # # Please see Stream#send for some implementational details. # # Please read the note about nesting at Stream#send # xml:: [XMPPStanza] def send_with_id(xml, &block) if xml.id.nil? xml.id = Jabber::IdGenerator.instance.generate_id end res = nil error = nil send(xml) do |received| if received.kind_of? XMPPStanza and received.id == xml.id if received.type == :error error = (received.error ? received.error : ErrorResponse.new) true elsif block_given? res = yield(received) true else res = received true end else false end end unless error.nil? raise ServerError.new(error) end res end ## # Adds a callback block to process received XML messages, these # will be handled before any blocks given to Stream#send or other # callbacks. # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_xml_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @xmlcbs.add(priority, ref, block) end end ## # Delete an XML-messages callback # # ref:: [String] The reference of the callback to delete def delete_xml_callback(ref) @tbcbmutex.synchronize do @xmlcbs.delete(ref) end end ## # Adds a callback block to process received Messages # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_message_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @messagecbs.add(priority, ref, block) end end ## # Delete an Message callback # # ref:: [String] The reference of the callback to delete def delete_message_callback(ref) @tbcbmutex.synchronize do @messagecbs.delete(ref) end end ## # Adds a callback block to process received Stanzas # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_stanza_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @stanzacbs.add(priority, ref, block) end end ## # Delete a Stanza callback # # ref:: [String] The reference of the callback to delete def delete_stanza_callback(ref) @tbcbmutex.synchronize do @stanzacbs.delete(ref) end end ## # Adds a callback block to process received Presences # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_presence_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @presencecbs.add(priority, ref, block) end end ## # Delete a Presence callback # # ref:: [String] The reference of the callback to delete def delete_presence_callback(ref) @tbcbmutex.synchronize do @presencecbs.delete(ref) end end ## # Adds a callback block to process received Iqs # # priority:: [Integer] The callback's priority, the higher, the sooner # ref:: [String] The callback's reference # &block:: [Block] The optional block def add_iq_callback(priority = 0, ref = nil, &block) @tbcbmutex.synchronize do @iqcbs.add(priority, ref, block) end end ## # Delete an Iq callback # # ref:: [String] The reference of the callback to delete # def delete_iq_callback(ref) @tbcbmutex.synchronize do @iqcbs.delete(ref) end end ## # Closes the connection to the Jabber service def close close! end def close! pr = 1 n = 0 # In some cases, we might lost count of some stanzas # (for example, if the handler raises an exception) # so we can't block forever. while pr > 0 and n <= 20 @tbcbmutex.synchronize { pr = @processing } if pr > 0 n += 1 Jabber::debuglog("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS") #puts("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS") sleep 0.1 end end # Order Matters here! If this method is called from within # @parser_thread then killing @parser_thread first would # mean the other parts of the method fail to execute. # That would be bad. So kill parser_thread last @fd.close if @fd and !@fd.closed? @status = DISCONNECTED @parser_thread.kill if @parser_thread end end end