Sha256: 01a9b0a98e656dafc5b25b40719ff722bd1f55ad06b9326fe5f90eb1e2562059

Contents?: true

Size: 1.93 KB

Versions: 4

Compression:

Stored size: 1.93 KB

Contents

module Ably::Realtime
  class Client
    # OutgoingMessageDispatcher is a (private) class that is used to deliver
    # outgoing {Ably::Models::ProtocolMessage}s using the {Ably::Realtime::Connection}
    # when the connection state is capable of delivering messages
    class OutgoingMessageDispatcher
      include Ably::Modules::EventMachineHelpers

      ACTION = Ably::Models::ProtocolMessage::ACTION

      def initialize(client, connection)
        @client     = client
        @connection = connection

        subscribe_to_outgoing_protocol_message_queue
        setup_event_handlers
      end

      private
      attr_reader :client, :connection

      def can_send_messages?
        connection.connected? || connection.closing?
      end

      def messages_in_outgoing_queue?
        !outgoing_queue.empty?
      end

      def outgoing_queue
        connection.__outgoing_message_queue__
      end

      def pending_queue
        connection.__pending_message_queue__
      end

      def current_transport_outgoing_message_bus
        connection.transport.__outgoing_protocol_msgbus__
      end

      def deliver_queued_protocol_messages
        condition = -> { can_send_messages? && messages_in_outgoing_queue? }

        non_blocking_loop_while(condition) do
          protocol_message = outgoing_queue.shift
          current_transport_outgoing_message_bus.publish :protocol_message, protocol_message

          if protocol_message.ack_required?
            pending_queue << protocol_message
          else
            protocol_message.succeed protocol_message
          end
        end
      end

      def subscribe_to_outgoing_protocol_message_queue
        connection.__outgoing_protocol_msgbus__.subscribe(:protocol_message) do |*args|
          deliver_queued_protocol_messages
        end
      end

      def setup_event_handlers
        connection.on(:connected) do
          deliver_queued_protocol_messages
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
ably-rest-0.7.3 lib/submodules/ably-ruby/lib/ably/realtime/client/outgoing_message_dispatcher.rb
ably-0.7.2 lib/ably/realtime/client/outgoing_message_dispatcher.rb
ably-0.7.1 lib/ably/realtime/client/outgoing_message_dispatcher.rb
ably-0.7.0 lib/ably/realtime/client/outgoing_message_dispatcher.rb