lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.7.3 vs lib/submodules/ably-ruby/lib/ably/realtime/connection.rb in ably-rest-0.7.5
- old
+ new
@@ -35,10 +35,11 @@
# @return [Ably::Realtime::Connection::STATE] connection state
#
class Connection
include Ably::Modules::EventEmitter
include Ably::Modules::Conversions
+ include Ably::Modules::SafeYield
extend Ably::Modules::Enum
# Valid Connection states
STATE = ruby_enum('STATE',
:initialized,
@@ -92,18 +93,18 @@
attr_reader :__outgoing_message_queue__
# An internal queue used to manage sent messages. You should never interface with this array directly
# @return [Array]
# @api private
- attr_reader :__pending_message_queue__
+ attr_reader :__pending_message_ack_queue__
# @api public
def initialize(client)
- @client = client
- @client_serial = -1
- @__outgoing_message_queue__ = []
- @__pending_message_queue__ = []
+ @client = client
+ @client_serial = -1
+ @__outgoing_message_queue__ = []
+ @__pending_message_ack_queue__ = []
Client::IncomingMessageDispatcher.new client, self
Client::OutgoingMessageDispatcher.new client, self
@state_machine = ConnectionStateMachine.new(self)
@@ -154,21 +155,21 @@
# puts "Ping took #{ms_elapsed}ms"
# end
#
# @return [void]
#
- def ping
+ def ping(&block)
raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?
started = nil
wait_for_ping = Proc.new do |protocol_message|
if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
__incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i
- yield time_passed if block_given?
+ safe_yield block, time_passed if block_given?
end
end
once_or_if(STATE.Connected) do
started = Time.now
@@ -200,11 +201,11 @@
# @return [String] recovery key that can be used by another client to recover this connection with the :recover option
def recovery_key
"#{key}:#{serial}" if connection_resumable?
end
- # Following a new connection being made, resumed or recovered, the connection ID, connection key
+ # Following a new connection being made, the connection ID, connection key
# and message serial need to match the details provided by the server.
#
# @return [void]
# @api private
def configure_new(connection_id, connection_key, connection_serial)
@@ -288,11 +289,11 @@
# @param [Ably::Models::ProtocolMessage] protocol_message
# @return [void]
# @api private
def send_protocol_message(protocol_message)
add_message_serial_if_ack_required_to(protocol_message) do
- Ably::Models::ProtocolMessage.new(protocol_message).tap do |protocol_message|
+ Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |protocol_message|
add_message_to_outgoing_queue protocol_message
notify_message_dispatcher_of_new_message protocol_message
logger.debug("Connection: Prot msg queued =>: #{protocol_message.action} #{protocol_message}")
end
end
@@ -360,10 +361,33 @@
# @api private
def set_failed_connection_error_reason(error)
@error_reason = error
end
+ # @api private
+ def clear_error_reason
+ @error_reason = nil
+ end
+
+ # Triggers registered callbacks for a successful connection resume event
+ # @api private
+ def resumed
+ resume_callbacks.each(&:call)
+ end
+
+ # Provides a simple hook to inject a callback when a connection is successfully resumed
+ # @api private
+ def on_resume(&callback)
+ resume_callbacks << callback
+ end
+
+ # Remove a registered connection resume callback
+ # @api private
+ def off_resume(&callback)
+ resume_callbacks.delete(callback)
+ end
+
# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
private
@@ -376,10 +400,14 @@
# A connection serial guarantees the server has received the message and is thus used for connection
# recovery and resumes.
# @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
attr_reader :client_serial
+ def resume_callbacks
+ @resume_callbacks ||= []
+ end
+
def create_pub_sub_message_bus
Ably::Util::PubSub.new(
coerce_into: Proc.new do |event|
raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message
:protocol_message
@@ -441,5 +469,9 @@
previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count
end
end
end
end
+
+require 'ably/realtime/connection/connection_manager'
+require 'ably/realtime/connection/connection_state_machine'
+require 'ably/realtime/connection/websocket_transport'