lib/blather/client/client.rb in blather-1.0.0 vs lib/blather/client/client.rb in blather-1.1.0

- old
+ new

@@ -32,39 +32,42 @@ # end # class Client attr_reader :jid, :roster, - :caps + :caps, + :queue_size # Create a new client and set it up # # @param [Blather::JID, #to_s] jid the JID to authorize with # @param [String] password the password to authorize with # @param [String] host if this isn't set it'll be resolved off the JID's # domain # @param [Fixnum, String] port the port to connect to. + # @param [Hash] options a list of options to create the client with + # @option options [Number] :workqueue_count (5) the number of threads used to process incoming XMPP messages. + # If this parameter is specified with 0, no background threads are used; + # instead stanzas are handled in the same process that the Client is running in. # # @return [Blather::Client] - def self.setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil) - self.new.setup(jid, password, host, port, certs, connect_timeout) + def self.setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil, options = {}) + self.new.setup(jid, password, host, port, certs, connect_timeout, options) end + def initialize # @private @state = :initializing @status = Stanza::Presence::Status.new @handlers = {} @tmp_handlers = {} @filters = {:before => [], :after => []} @roster = Roster.new self @caps = Stanza::Capabilities.new + @queue_size = 5 - @handler_queue = GirlFriday::WorkQueue.new :handle_stanza, :size => 5 do |stanza| - handle_data stanza - end - setup_initial_handlers end # Check whether the client is currently connected. def connected? @@ -166,11 +169,15 @@ write stanza end # Close the connection def close - EM.next_tick { self.stream.close_connection_after_writing } + EM.next_tick { + handler_queue.shutdown if handler_queue + @handler_queue = nil + self.stream.close_connection_after_writing if connected? + } end # @private def post_init(stream, jid = nil) @stream = stream @@ -183,11 +190,15 @@ call_handler_for(:disconnected, nil) || (EM.reactor_running? && EM.stop) end # @private def receive_data(stanza) - @handler_queue << stanza + if handler_queue + handler_queue << stanza + else + handle_data stanza + end end def handle_data(stanza) catch(:halt) do run_filters :before, stanza @@ -200,20 +211,29 @@ def setup? @setup.is_a? Array end # @private - def setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil) + def setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil, options = {}) @jid = JID.new(jid) @setup = [@jid, password] @setup << host @setup << port @setup << certs @setup << connect_timeout + @queue_size = options[:workqueue_count] || 5 self end + # @private + def handler_queue + return if queue_size == 0 + @handler_queue ||= GirlFriday::WorkQueue.new :handle_stanza, :size => queue_size do |stanza| + handle_data stanza + end + end + protected def stream @stream || raise('Stream not ready!') end @@ -255,10 +275,10 @@ call_handler_for :ready, nil end def client_post_init write_with_handler Stanza::Iq::Roster.new do |node| - roster.process node + roster.process(node) unless node.error? write @status ready! end end