lib/ably/realtime/client/outgoing_message_dispatcher.rb in ably-0.1.6 vs lib/ably/realtime/client/outgoing_message_dispatcher.rb in ably-0.2.0
- old
+ new
@@ -9,10 +9,11 @@
ACTION = Ably::Models::ProtocolMessage::ACTION
def initialize(client, connection)
@client = client
@connection = connection
+
subscribe_to_outgoing_protocol_message_queue
setup_event_handlers
end
private
@@ -32,22 +33,30 @@
def pending_queue
connection.__pending_message_queue__
end
+ def current_transport_outgoing_message_bus
+ connection.transport.__outgoing_protocol_msgbus__
+ end
+
def deliver_queued_protocol_messages
condition = -> { can_send_messages? && messages_in_outgoing_queue? }
non_blocking_loop_while(condition) do
protocol_message = outgoing_queue.shift
- pending_queue << protocol_message if protocol_message.ack_required?
- connection.transport.send_object protocol_message
- client.logger.debug "Prot msg sent =>: #{protocol_message.action} #{protocol_message}"
+ current_transport_outgoing_message_bus.publish :protocol_message, protocol_message
+
+ if protocol_message.ack_required?
+ pending_queue << protocol_message
+ else
+ protocol_message.succeed protocol_message
+ end
end
end
def subscribe_to_outgoing_protocol_message_queue
- connection.__outgoing_protocol_msgbus__.subscribe(:message) do |*args|
+ connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |*args|
deliver_queued_protocol_messages
end
end
def setup_event_handlers