lib/ably/realtime/channel.rb in ably-0.8.15 vs lib/ably/realtime/channel.rb in ably-1.0.0

- old
+ new

@@ -21,12 +21,10 @@ # Channel::STATE.Attached # Channel::STATE.Detaching # Channel::STATE.Detached # Channel::STATE.Failed # - # Channels emit errors - use +on(:error)+ to subscribe to errors - # # @!attribute [r] state # @return {Ably::Realtime::Connection::STATE} channel state # class Channel include Ably::Modules::Conversions @@ -34,18 +32,28 @@ include Ably::Modules::EventMachineHelpers include Ably::Modules::AsyncWrapper include Ably::Modules::MessageEmitter extend Ably::Modules::Enum + # ChannelState + # The permited states for this channel STATE = ruby_enum('STATE', :initialized, :attaching, :attached, :detaching, :detached, + :suspended, :failed ) + + # ChannelEvent + # The permitted channel events that are emitted for this channel + EVENT = ruby_enum('EVENT', + STATE.to_sym_arr + [:update] + ) + include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ChannelStateChange' # Max number of messages to bundle in a single ProtocolMessage @@ -136,15 +144,18 @@ # channel.publish('click', 'body').errback do |error, message| # puts "#{message.name} was not received, error #{error.message}" # end # def publish(name, data = nil, attributes = {}, &success_block) - raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a detached channel') if detached? || detaching? - raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a failed channel') if failed? + if detached? || detaching? || failed? + error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}") + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) + end if !connection.can_publish_messages? - raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") + error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end messages = if name.kind_of?(Enumerable) name else @@ -190,14 +201,23 @@ # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def attach(&success_block) if connection.closing? || connection.closed? || connection.suspended? || connection.failed? - raise Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") + error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end - transition_state_machine :attaching if can_transition_to?(:attaching) + if !attached? + if detaching? + # Let the pending operation complete (#RTL4h) + once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) } + else + transition_state_machine :attaching if can_transition_to?(:attaching) + end + end + deferrable_for_state_change_to(STATE.Attached, &success_block) end # Detach this channel, and call the block if provided when in a Detached or Failed state # @@ -205,18 +225,28 @@ # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def detach(&success_block) if initialized? success_block.call if block_given? - return Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| - EventMachine.next_tick { deferrable.succeed } - end + return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger) end - raise exception_for_state_change_to(:detaching) if failed? + if failed? || connection.closing? || connection.failed? + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching)) + end - transition_state_machine :detaching if can_transition_to?(:detaching) + if !detached? + if attaching? + # Let the pending operation complete (#RTL5i) + once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) } + elsif can_transition_to?(:detaching) + transition_state_machine :detaching + else + transition_state_machine! :detached + end + end + deferrable_for_state_change_to(STATE.Detached, &success_block) end # Presence object for this Channel. This controls this client's # presence on the channel and may also be used to obtain presence information @@ -242,11 +272,14 @@ # # @return [Ably::Util::SafeDeferrable] # def history(options = {}, &callback) if options.delete(:until_attach) - raise ArgumentError, 'option :until_attach is invalid as the channel is not attached' unless attached? + unless attached? + error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' ) + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) + end options[:from_serial] = attached_serial end async_wrap(callback) do rest_channel.history(options.merge(async_blocking_operations: true)) @@ -261,11 +294,11 @@ coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end # @api private - def set_failed_channel_error_reason(error) + def set_channel_error_reason(error) @error_reason = error end # @api private def clear_error_reason @@ -286,26 +319,30 @@ # @api private def logger client.logger end + # Internal queue used for messages published that cannot yet be enqueued on the connection + # @api private + def __queue__ + @queue + end + # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private - def queue - @queue - end - def setup_event_handlers __incoming_msgbus__.subscribe(:message) do |message| - message.decode self + message.decode(client.encoders, options) do |encode_error, error_message| + client.logger.error error_message + end emit_message message.name, message end - on(STATE.Attached) do + unsafe_on(STATE.Attached) do process_queue end end # Queue messages and process queue if channel is attached. @@ -314,19 +351,22 @@ def queue_messages(raw_messages) messages = Array(raw_messages).map do |raw_msg| create_message(raw_msg).tap do |message| next if message.client_id.nil? if message.client_id == '*' - raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages', 400, 40012) + raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages') end + if message.client_id && !message.client_id.kind_of?(String) + raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages') + end unless client.auth.can_assume_client_id?(message.client_id) - raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'", 400, 40012) + raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'") end end end - queue.push(*messages) + __queue__.push(*messages) if attached? process_queue else attach @@ -364,18 +404,18 @@ end end end def messages_in_queue? - !queue.empty? + !__queue__.empty? end # Move messages from Channel Queue into Outgoing Connection Queue def process_queue condition = -> { attached? && messages_in_queue? } non_blocking_loop_while(condition) do - send_messages_within_protocol_message queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) + send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) end end def send_messages_within_protocol_message(messages) connection.send_protocol_message( @@ -385,10 +425,12 @@ ) end def create_message(message) Ably::Models::Message(message.dup).tap do |msg| - msg.encode self + msg.encode(client.encoders, options) do |encode_error, error_message| + client.logger.error error_message + end end end def rest_channel client.rest_client.channel(name)