lib/ably/realtime/client/outgoing_message_dispatcher.rb in ably-0.1.5 vs lib/ably/realtime/client/outgoing_message_dispatcher.rb in ably-0.1.6
- old
+ new
@@ -1,28 +1,25 @@
module Ably::Realtime
class Client
# OutgoingMessageDispatcher is a (private) class that is used to deliver
- # outgoing {Ably::Realtime::Models::ProtocolMessage}s using the {Ably::Realtime::Connection}
+ # outgoing {Ably::Models::ProtocolMessage}s using the {Ably::Realtime::Connection}
# when the connection state is capable of delivering messages
class OutgoingMessageDispatcher
include Ably::Modules::EventMachineHelpers
- ACTION = Models::ProtocolMessage::ACTION
+ ACTION = Ably::Models::ProtocolMessage::ACTION
- def initialize(client)
- @client = client
+ def initialize(client, connection)
+ @client = client
+ @connection = connection
subscribe_to_outgoing_protocol_message_queue
setup_event_handlers
end
private
- attr_reader :client
+ attr_reader :client, :connection
- def connection
- client.connection
- end
-
def can_send_messages?
connection.connected?
end
def messages_in_outgoing_queue?
@@ -37,14 +34,15 @@
connection.__pending_message_queue__
end
def deliver_queued_protocol_messages
condition = -> { can_send_messages? && messages_in_outgoing_queue? }
+
non_blocking_loop_while(condition) do
protocol_message = outgoing_queue.shift
pending_queue << protocol_message if protocol_message.ack_required?
- connection.send_text(protocol_message.to_json)
- client.logger.debug("Prot msg sent =>: #{protocol_message.action} #{protocol_message}")
+ connection.transport.send_object protocol_message
+ client.logger.debug "Prot msg sent =>: #{protocol_message.action} #{protocol_message}"
end
end
def subscribe_to_outgoing_protocol_message_queue
connection.__outgoing_protocol_msgbus__.subscribe(:message) do |*args|