lib/ably/realtime/channel.rb in ably-0.8.15 vs lib/ably/realtime/channel.rb in ably-1.0.0
- old
+ new
@@ -21,12 +21,10 @@
# Channel::STATE.Attached
# Channel::STATE.Detaching
# Channel::STATE.Detached
# Channel::STATE.Failed
#
- # Channels emit errors - use +on(:error)+ to subscribe to errors
- #
# @!attribute [r] state
# @return {Ably::Realtime::Connection::STATE} channel state
#
class Channel
include Ably::Modules::Conversions
@@ -34,18 +32,28 @@
include Ably::Modules::EventMachineHelpers
include Ably::Modules::AsyncWrapper
include Ably::Modules::MessageEmitter
extend Ably::Modules::Enum
+ # ChannelState
+ # The permited states for this channel
STATE = ruby_enum('STATE',
:initialized,
:attaching,
:attached,
:detaching,
:detached,
+ :suspended,
:failed
)
+
+ # ChannelEvent
+ # The permitted channel events that are emitted for this channel
+ EVENT = ruby_enum('EVENT',
+ STATE.to_sym_arr + [:update]
+ )
+
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ChannelStateChange'
# Max number of messages to bundle in a single ProtocolMessage
@@ -136,15 +144,18 @@
# channel.publish('click', 'body').errback do |error, message|
# puts "#{message.name} was not received, error #{error.message}"
# end
#
def publish(name, data = nil, attributes = {}, &success_block)
- raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a detached channel') if detached? || detaching?
- raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a failed channel') if failed?
+ if detached? || detaching? || failed?
+ error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}")
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
+ end
if !connection.can_publish_messages?
- raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
+ error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
messages = if name.kind_of?(Enumerable)
name
else
@@ -190,14 +201,23 @@
# @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
#
def attach(&success_block)
if connection.closing? || connection.closed? || connection.suspended? || connection.failed?
- raise Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}")
+ error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}")
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
- transition_state_machine :attaching if can_transition_to?(:attaching)
+ if !attached?
+ if detaching?
+ # Let the pending operation complete (#RTL4h)
+ once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) }
+ else
+ transition_state_machine :attaching if can_transition_to?(:attaching)
+ end
+ end
+
deferrable_for_state_change_to(STATE.Attached, &success_block)
end
# Detach this channel, and call the block if provided when in a Detached or Failed state
#
@@ -205,18 +225,28 @@
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
#
def detach(&success_block)
if initialized?
success_block.call if block_given?
- return Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
- EventMachine.next_tick { deferrable.succeed }
- end
+ return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger)
end
- raise exception_for_state_change_to(:detaching) if failed?
+ if failed? || connection.closing? || connection.failed?
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching))
+ end
- transition_state_machine :detaching if can_transition_to?(:detaching)
+ if !detached?
+ if attaching?
+ # Let the pending operation complete (#RTL5i)
+ once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) }
+ elsif can_transition_to?(:detaching)
+ transition_state_machine :detaching
+ else
+ transition_state_machine! :detached
+ end
+ end
+
deferrable_for_state_change_to(STATE.Detached, &success_block)
end
# Presence object for this Channel. This controls this client's
# presence on the channel and may also be used to obtain presence information
@@ -242,11 +272,14 @@
#
# @return [Ably::Util::SafeDeferrable]
#
def history(options = {}, &callback)
if options.delete(:until_attach)
- raise ArgumentError, 'option :until_attach is invalid as the channel is not attached' unless attached?
+ unless attached?
+ error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' )
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
+ end
options[:from_serial] = attached_serial
end
async_wrap(callback) do
rest_channel.history(options.merge(async_blocking_operations: true))
@@ -261,11 +294,11 @@
coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) }
)
end
# @api private
- def set_failed_channel_error_reason(error)
+ def set_channel_error_reason(error)
@error_reason = error
end
# @api private
def clear_error_reason
@@ -286,26 +319,30 @@
# @api private
def logger
client.logger
end
+ # Internal queue used for messages published that cannot yet be enqueued on the connection
+ # @api private
+ def __queue__
+ @queue
+ end
+
# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
private
- def queue
- @queue
- end
-
def setup_event_handlers
__incoming_msgbus__.subscribe(:message) do |message|
- message.decode self
+ message.decode(client.encoders, options) do |encode_error, error_message|
+ client.logger.error error_message
+ end
emit_message message.name, message
end
- on(STATE.Attached) do
+ unsafe_on(STATE.Attached) do
process_queue
end
end
# Queue messages and process queue if channel is attached.
@@ -314,19 +351,22 @@
def queue_messages(raw_messages)
messages = Array(raw_messages).map do |raw_msg|
create_message(raw_msg).tap do |message|
next if message.client_id.nil?
if message.client_id == '*'
- raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages', 400, 40012)
+ raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
end
+ if message.client_id && !message.client_id.kind_of?(String)
+ raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages')
+ end
unless client.auth.can_assume_client_id?(message.client_id)
- raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'", 400, 40012)
+ raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
end
end
end
- queue.push(*messages)
+ __queue__.push(*messages)
if attached?
process_queue
else
attach
@@ -364,18 +404,18 @@
end
end
end
def messages_in_queue?
- !queue.empty?
+ !__queue__.empty?
end
# Move messages from Channel Queue into Outgoing Connection Queue
def process_queue
condition = -> { attached? && messages_in_queue? }
non_blocking_loop_while(condition) do
- send_messages_within_protocol_message queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)
+ send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)
end
end
def send_messages_within_protocol_message(messages)
connection.send_protocol_message(
@@ -385,10 +425,12 @@
)
end
def create_message(message)
Ably::Models::Message(message.dup).tap do |msg|
- msg.encode self
+ msg.encode(client.encoders, options) do |encode_error, error_message|
+ client.logger.error error_message
+ end
end
end
def rest_channel
client.rest_client.channel(name)