lib/punchblock/connection.rb in punchblock-0.4.3 vs lib/punchblock/connection.rb in punchblock-0.5.0
- old
+ new
@@ -1,210 +1,8 @@
-%w{
- timeout
- blather/client/dsl
- punchblock/core_ext/blather/stanza
- punchblock/core_ext/blather/stanza/presence
-}.each { |f| require f }
-
module Punchblock
- class Connection < GenericConnection
- include Blather::DSL
+ module Connection
+ extend ActiveSupport::Autoload
- ##
- # Initialize the required connection attributes
- #
- # @param [Hash] options
- # @option options [String] :username client JID
- # @option options [String] :password XMPP password
- # @option options [String] :rayo_domain the domain on which Rayo is running
- # @option options [Logger] :wire_logger to which all XMPP transactions will be logged
- # @option options [Boolean, Optional] :auto_reconnect whether or not to auto reconnect
- # @option options [Numeric, Optional] :write_timeout for which to wait on a command response
- # @option options [Numeric, nil, Optional] :ping_period interval in seconds on which to ping the server. Nil or false to disable
- #
- def initialize(options = {})
- super
-
- raise ArgumentError unless (@username = options[:username]) && options[:password]
-
- setup *[:username, :password, :host, :port, :certs].map { |key| options.delete key }
-
- @rayo_domain = options[:rayo_domain] || Blather::JID.new(@username).domain
-
- @callmap = {} # This hash maps call IDs to their XMPP domain.
-
- @component_id_to_iq_id = {}
- @iq_id_to_command = {}
-
- @auto_reconnect = !!options[:auto_reconnect]
- @reconnect_attempts = 0
-
- @write_timeout = options[:write_timeout] || 3
-
- @ping_period = options.has_key?(:ping_period) ? options[:ping_period] : 60
-
- Blather.logger = options.delete(:wire_logger) if options.has_key?(:wire_logger)
- end
-
- ##
- # Write a command to the Rayo server for a particular call
- #
- # @param [String] call the call ID on which to act
- # @param [CommandNode] cmd the command to execute on the call
- # @param [String, Optional] component_id the component_id on which to execute
- #
- # @raise Exception if there is a server-side error
- #
- # @return true
- #
- def write(call_id, cmd, component_id = nil)
- async_write call_id, cmd, component_id
- cmd.response(@write_timeout).tap { |result| raise result if result.is_a? Exception }
- end
-
- ##
- # @return [Queue] Pop this queue to determine result of command execution. Will be true or an exception
- def async_write(call_id, cmd, component_id = nil)
- iq = prep_command_for_execution call_id, cmd, component_id
- write_to_stream iq
- cmd.request!
- end
-
- def prep_command_for_execution(call_id, cmd, component_id = nil)
- cmd.connection = self
- cmd.call_id = call_id
- jid = cmd.is_a?(Command::Dial) ? @rayo_domain : "#{call_id}@#{@callmap[call_id]}"
- jid << "/#{component_id}" if component_id
- create_iq(jid).tap do |iq|
- @logger.debug "Sending IQ ID #{iq.id} #{cmd.inspect} to #{jid}" if @logger
- iq << cmd
- @iq_id_to_command[iq.id] = cmd
- end
- end
-
- ##
- # Fire up the connection
- #
- def run
- register_handlers
- connect
- end
-
- def connect
- begin
- EM.run { client.run }
- rescue Blather::SASLError, Blather::StreamError => e
- raise ProtocolError.new(e.class.to_s, e.message)
- end
- end
-
- def stop
- @reconnect_attempts = nil
- client.close
- end
-
- def connected?
- client.connected?
- end
-
- ##
- #
- # Get the original command issued by command ID
- #
- # @param [String] component_id
- #
- # @return [RayoNode]
- #
- def original_component_from_id(component_id)
- @iq_id_to_command[@component_id_to_iq_id[component_id]]
- end
-
- def record_command_id_for_iq_id(command_id, iq_id)
- @component_id_to_iq_id[command_id] = iq_id
- end
-
- private
-
- def handle_presence(p)
- throw :pass unless p.rayo_event? && p.from.domain == @rayo_domain
- @logger.info "Receiving event for call ID #{p.call_id}" if @logger
- @callmap[p.call_id] = p.from.domain
- @logger.debug p.inspect if @logger
- event = p.event
- event.connection = self
- if event.source
- event.source.add_event event
- else
- @event_queue.push event
- end
- end
-
- def handle_iq_result(iq)
- # FIXME: Do we need to raise a warning if the domain changes?
- throw :pass unless command = @iq_id_to_command[iq.id]
- @callmap[iq.from.node] = iq.from.domain
- @logger.debug "Command #{iq.id} completed successfully" if @logger
- command.response = iq
- end
-
- def handle_error(iq)
- e = Blather::StanzaError.import iq
-
- protocol_error = ProtocolError.new e.name, e.text, iq.call_id, iq.component_id
-
- throw :pass unless command = @iq_id_to_command[iq.id]
-
- command.response = protocol_error
- end
-
- def register_handlers
- # Push a message to the queue and the log that we connected
- when_ready do
- @event_queue.push connected
- @logger.info "Connected to XMPP as #{@username}" if @logger
- @reconnect_attempts = 0
- @rayo_ping = EM::PeriodicTimer.new(@ping_period) { ping_rayo } if @ping_period
- end
-
- disconnected do
- @rayo_ping.cancel if @rayo_ping
- if @auto_reconnect && @reconnect_attempts
- timer = 30 * 2 ** @reconnect_attempts
- @logger.warn "XMPP disconnected. Tried to reconnect #{@reconnect_attempts} times. Reconnecting in #{timer}s." if @logger
- sleep timer
- @logger.info "Trying to reconnect..." if @logger
- @reconnect_attempts += 1
- connect
- end
- end
-
- # Read/handle call control messages. These are mostly just acknowledgement of commands
- iq :result? do |msg|
- handle_iq_result msg
- end
-
- # Read/handle error IQs
- iq :error? do |e|
- handle_error e
- end
-
- # Read/handle presence requests. This is how we get events.
- presence do |msg|
- handle_presence msg
- end
- end
-
- def ping_rayo
- client.write_with_handler Blather::Stanza::Iq::Ping.new(:set, @rayo_domain) do |response|
- begin
- handle_error response if response.is_a? Blather::BlatherError
- rescue ProtocolError => e
- raise e unless e.name == :feature_not_implemented
- end
- end
- end
-
- def create_iq(jid = nil)
- Blather::Stanza::Iq.new :set, jid || @call_id
- end
+ autoload :Connected
+ autoload :XMPP
end
end