lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.7.3 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.7.5

- old
+ new

@@ -35,10 +35,11 @@ # @return [Ably::Realtime::Connection::STATE] connection state # class Connection include Ably::Modules::EventEmitter include Ably::Modules::Conversions + include Ably::Modules::SafeYield extend Ably::Modules::Enum # Valid Connection states STATE = ruby_enum('STATE', :initialized, @@ -92,18 +93,18 @@ attr_reader :__outgoing_message_queue__ # An internal queue used to manage sent messages. You should never interface with this array directly # @return [Array] # @api private - attr_reader :__pending_message_queue__ + attr_reader :__pending_message_ack_queue__ # @api public def initialize(client) - @client = client - @client_serial = -1 - @__outgoing_message_queue__ = [] - @__pending_message_queue__ = [] + @client = client + @client_serial = -1 + @__outgoing_message_queue__ = [] + @__pending_message_ack_queue__ = [] Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @@ -154,21 +155,21 @@ # puts "Ping took #{ms_elapsed}ms" # end # # @return [void] # - def ping + def ping(&block) raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? started = nil wait_for_ping = Proc.new do |protocol_message| if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i - yield time_passed if block_given? + safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do started = Time.now @@ -200,11 +201,11 @@ # @return [String] recovery key that can be used by another client to recover this connection with the :recover option def recovery_key "#{key}:#{serial}" if connection_resumable? end - # Following a new connection being made, resumed or recovered, the connection ID, connection key + # Following a new connection being made, the connection ID, connection key # and message serial need to match the details provided by the server. # # @return [void] # @api private def configure_new(connection_id, connection_key, connection_serial) @@ -288,11 +289,11 @@ # @param [Ably::Models::ProtocolMessage] protocol_message # @return [void] # @api private def send_protocol_message(protocol_message) add_message_serial_if_ack_required_to(protocol_message) do - Ably::Models::ProtocolMessage.new(protocol_message).tap do |protocol_message| + Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |protocol_message| add_message_to_outgoing_queue protocol_message notify_message_dispatcher_of_new_message protocol_message logger.debug("Connection: Prot msg queued =>: #{protocol_message.action} #{protocol_message}") end end @@ -360,10 +361,33 @@ # @api private def set_failed_connection_error_reason(error) @error_reason = error end + # @api private + def clear_error_reason + @error_reason = nil + end + + # Triggers registered callbacks for a successful connection resume event + # @api private + def resumed + resume_callbacks.each(&:call) + end + + # Provides a simple hook to inject a callback when a connection is successfully resumed + # @api private + def on_resume(&callback) + resume_callbacks << callback + end + + # Remove a registered connection resume callback + # @api private + def off_resume(&callback) + resume_callbacks.delete(callback) + end + # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private @@ -376,10 +400,14 @@ # A connection serial guarantees the server has received the message and is thus used for connection # recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent attr_reader :client_serial + def resume_callbacks + @resume_callbacks ||= [] + end + def create_pub_sub_message_bus Ably::Util::PubSub.new( coerce_into: Proc.new do |event| raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message :protocol_message @@ -441,5 +469,9 @@ previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count end end end end + +require 'ably/realtime/connection/connection_manager' +require 'ably/realtime/connection/connection_state_machine' +require 'ably/realtime/connection/websocket_transport'