lib/ably/realtime/client/outgoing_message_dispatcher.rb in ably-0.1.5 vs lib/ably/realtime/client/outgoing_message_dispatcher.rb in ably-0.1.6

- old
+ new

@@ -1,28 +1,25 @@ module Ably::Realtime class Client # OutgoingMessageDispatcher is a (private) class that is used to deliver - # outgoing {Ably::Realtime::Models::ProtocolMessage}s using the {Ably::Realtime::Connection} + # outgoing {Ably::Models::ProtocolMessage}s using the {Ably::Realtime::Connection} # when the connection state is capable of delivering messages class OutgoingMessageDispatcher include Ably::Modules::EventMachineHelpers - ACTION = Models::ProtocolMessage::ACTION + ACTION = Ably::Models::ProtocolMessage::ACTION - def initialize(client) - @client = client + def initialize(client, connection) + @client = client + @connection = connection subscribe_to_outgoing_protocol_message_queue setup_event_handlers end private - attr_reader :client + attr_reader :client, :connection - def connection - client.connection - end - def can_send_messages? connection.connected? end def messages_in_outgoing_queue? @@ -37,14 +34,15 @@ connection.__pending_message_queue__ end def deliver_queued_protocol_messages condition = -> { can_send_messages? && messages_in_outgoing_queue? } + non_blocking_loop_while(condition) do protocol_message = outgoing_queue.shift pending_queue << protocol_message if protocol_message.ack_required? - connection.send_text(protocol_message.to_json) - client.logger.debug("Prot msg sent =>: #{protocol_message.action} #{protocol_message}") + connection.transport.send_object protocol_message + client.logger.debug "Prot msg sent =>: #{protocol_message.action} #{protocol_message}" end end def subscribe_to_outgoing_protocol_message_queue connection.__outgoing_protocol_msgbus__.subscribe(:message) do |*args|