lib/blather/client/client.rb in blather-1.0.0 vs lib/blather/client/client.rb in blather-1.1.0
- old
+ new
@@ -32,39 +32,42 @@
# end
#
class Client
attr_reader :jid,
:roster,
- :caps
+ :caps,
+ :queue_size
# Create a new client and set it up
#
# @param [Blather::JID, #to_s] jid the JID to authorize with
# @param [String] password the password to authorize with
# @param [String] host if this isn't set it'll be resolved off the JID's
# domain
# @param [Fixnum, String] port the port to connect to.
+ # @param [Hash] options a list of options to create the client with
+ # @option options [Number] :workqueue_count (5) the number of threads used to process incoming XMPP messages.
+ # If this parameter is specified with 0, no background threads are used;
+ # instead stanzas are handled in the same process that the Client is running in.
#
# @return [Blather::Client]
- def self.setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil)
- self.new.setup(jid, password, host, port, certs, connect_timeout)
+ def self.setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil, options = {})
+ self.new.setup(jid, password, host, port, certs, connect_timeout, options)
end
+
def initialize # @private
@state = :initializing
@status = Stanza::Presence::Status.new
@handlers = {}
@tmp_handlers = {}
@filters = {:before => [], :after => []}
@roster = Roster.new self
@caps = Stanza::Capabilities.new
+ @queue_size = 5
- @handler_queue = GirlFriday::WorkQueue.new :handle_stanza, :size => 5 do |stanza|
- handle_data stanza
- end
-
setup_initial_handlers
end
# Check whether the client is currently connected.
def connected?
@@ -166,11 +169,15 @@
write stanza
end
# Close the connection
def close
- EM.next_tick { self.stream.close_connection_after_writing }
+ EM.next_tick {
+ handler_queue.shutdown if handler_queue
+ @handler_queue = nil
+ self.stream.close_connection_after_writing if connected?
+ }
end
# @private
def post_init(stream, jid = nil)
@stream = stream
@@ -183,11 +190,15 @@
call_handler_for(:disconnected, nil) || (EM.reactor_running? && EM.stop)
end
# @private
def receive_data(stanza)
- @handler_queue << stanza
+ if handler_queue
+ handler_queue << stanza
+ else
+ handle_data stanza
+ end
end
def handle_data(stanza)
catch(:halt) do
run_filters :before, stanza
@@ -200,20 +211,29 @@
def setup?
@setup.is_a? Array
end
# @private
- def setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil)
+ def setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil, options = {})
@jid = JID.new(jid)
@setup = [@jid, password]
@setup << host
@setup << port
@setup << certs
@setup << connect_timeout
+ @queue_size = options[:workqueue_count] || 5
self
end
+ # @private
+ def handler_queue
+ return if queue_size == 0
+ @handler_queue ||= GirlFriday::WorkQueue.new :handle_stanza, :size => queue_size do |stanza|
+ handle_data stanza
+ end
+ end
+
protected
def stream
@stream || raise('Stream not ready!')
end
@@ -255,10 +275,10 @@
call_handler_for :ready, nil
end
def client_post_init
write_with_handler Stanza::Iq::Roster.new do |node|
- roster.process node
+ roster.process(node) unless node.error?
write @status
ready!
end
end