lib/ably/realtime/channel.rb in ably-0.1.4 vs lib/ably/realtime/channel.rb in ably-0.1.5
- old
+ new
@@ -1,96 +1,157 @@
module Ably
module Realtime
+ # The Channel class represents a Channel belonging to this application.
+ # The Channel instance allows messages to be published and
+ # received, and controls the lifecycle of this instance's
+ # attachment to the channel.
+ #
+ # Channels will always be in one of the following states:
+ #
+ # initialized: 0
+ # attaching: 1
+ # attached: 2
+ # detaching: 3
+ # detached: 4
+ # failed: 5
+ #
+ # Note that the states are available as Enum-like constants:
+ #
+ # Channel::STATE.Initialized
+ # Channel::STATE.Attaching
+ # Channel::STATE.Attached
+ # Channel::STATE.Detaching
+ # Channel::STATE.Detached
+ # Channel::STATE.Failed
+ #
+ # @!attribute [r] state
+ # @return {Ably::Realtime::Connection::STATE} channel state
+ #
class Channel
include Ably::Modules::Conversions
- include Callbacks
+ include Ably::Modules::EventEmitter
+ include Ably::Modules::EventMachineHelpers
+ extend Ably::Modules::Enum
- STATES = {
- initialised: 1,
- attaching: 2,
- attached: 3,
- detaching: 4,
- detached: 5,
- failed: 6
- }.freeze
+ STATE = ruby_enum('STATE',
+ :initialized,
+ :attaching,
+ :attached,
+ :detaching,
+ :detached,
+ :failed
+ )
+ include Ably::Modules::State
+ # Max number of messages to bundle in a single ProtocolMessage
+ MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50
+
attr_reader :client, :name
- # Retrieve a state symbol by the integer value
- def self.state_sym_for(state_int)
- @states_index_by_int ||= STATES.invert.freeze
- @states_index_by_int[state_int]
+ def initialize(client, name)
+ @client = client
+ @name = name
+ @subscriptions = Hash.new { |hash, key| hash[key] = [] }
+ @queue = []
+ @state = STATE.Initialized
+
+ setup_event_handlers
end
- def initialize(client, name)
- @client = client
- @name = name
- @subscriptions = Hash.new { |hash, key| hash[key] = [] }
- @queue = []
+ # Publish a message on the channel
+ #
+ # @param event [String] The event name of the message
+ # @param data [String,ByteArray] payload for the message
+ # @yield [Ably::Realtime::Models::Message] On success, will call the block with the {Ably::Realtime::Models::Message}
+ # @return [Ably::Realtime::Models::Message]
+ #
+ def publish(event, data, &callback)
+ Models::Message.new({
+ name: event,
+ data: data,
+ timestamp: as_since_epoch(Time.now),
+ client_id: client.client_id
+ }, nil).tap do |message|
+ message.callback(&callback) if block_given?
+ queue_message message
+ end
+ end
- set_state :initialised
+ def subscribe(event = :all, &blk)
+ event = event.to_s unless event == :all
+ attach unless attached? || attaching?
+ @subscriptions[event] << blk
+ end
- on(:message) do |message|
- @subscriptions[:all].each { |cb| cb.call(message) }
+ def attach
+ unless attached? || attaching?
+ change_state STATE.Attaching
+ send_attach_protocol_message
+ end
+ end
+
+ def __incoming_protocol_msgbus__
+ @__incoming_protocol_msgbus__ ||= Ably::Util::PubSub.new(
+ coerce_into: Proc.new { |event| Models::ProtocolMessage::ACTION(event) }
+ )
+ end
+
+ private
+ attr_reader :queue
+
+ def setup_event_handlers
+ __incoming_protocol_msgbus__.subscribe(:message) do |message|
+ @subscriptions[:all].each { |cb| cb.call(message) }
@subscriptions[message.name].each { |cb| cb.call(message) }
end
on(:attached) do
- set_state :attached
process_queue
end
end
- # Current Channel state, will always be one of {STATES}
- #
- # @return [Symbol] state
- def state
- self.class.state_sym_for(@state)
- end
+ # Queue message and process queue if channel is attached.
+ # If channel is not yet attached, attempt to attach it before the message queue is processed.
+ def queue_message(message)
+ queue << message
- def state?(check_state)
- check_state = STATES.fetch(check_state) if check_state.kind_of?(Symbol)
- @state == check_state
- end
-
- def publish(event, data)
- queue << { name: event, data: data, timestamp: as_since_epoch(Time.now) }
-
if attached?
process_queue
else
attach
end
end
- def subscribe(event = :all, &blk)
- event = event.to_s unless event == :all
- attach unless attached?
- @subscriptions[event] << blk
+ def messages_in_queue?
+ !queue.empty?
end
- def attach
- unless state?(:attaching)
- set_state :attaching
- client.attach_to_channel(name)
+ # Move messages from Channel Queue into Outgoing Connection Queue
+ def process_queue
+ condition = -> { attached? && messages_in_queue? }
+ non_blocking_loop_while(condition) do
+ send_messages_within_protocol_message(queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE))
end
end
- def attached?
- state?(:attached)
+ def send_messages_within_protocol_message(messages)
+ client.connection.send_protocol_message(
+ action: Models::ProtocolMessage::ACTION.Message.to_i,
+ channel: name,
+ messages: messages
+ )
end
- private
- attr_reader :queue
-
- def set_state(new_state)
- new_state = STATES.fetch(new_state) if new_state.kind_of?(Symbol)
- raise ArgumentError, "#{new_state} is not a valid state" unless STATES.values.include?(new_state)
- @state = new_state
+ def send_attach_protocol_message
+ client.connection.send_protocol_message(
+ action: Models::ProtocolMessage::ACTION.Attach.to_i,
+ channel: name
+ )
end
- def process_queue
- client.send_messages(name, queue.shift(100)) until queue.empty?
+ # Used by {Ably::Modules::State} to debug state changes
+ def logger
+ client.logger
end
end
end
end