lib/ably/realtime/channel.rb in ably-1.1.8 vs lib/ably/realtime/channel.rb in ably-1.2.0

- old
+ new

@@ -34,10 +34,11 @@ include Ably::Modules::EventMachineHelpers include Ably::Modules::AsyncWrapper include Ably::Modules::MessageEmitter include Ably::Realtime::Channel::Publisher extend Ably::Modules::Enum + extend Forwardable # ChannelState # The permited states for this channel STATE = ruby_enum('STATE', :initialized, @@ -90,30 +91,39 @@ # The Channel manager responsible for attaching, detaching and handling failures for this channel # @return [Ably::Realtime::Channel::ChannelManager] # @api private attr_reader :manager + # Flag that specifies whether channel is resuming attachment(reattach) or is doing a 'clean attach' RTL4j1 + # @return [Bolean] + # @api private + attr_reader :attach_resume + + # ChannelOptions params attrribute (#RTL4k) + # return [Hash] + def_delegators :options, :params + # Initialize a new Channel object # # @param client [Ably::Rest::Client] # @param name [String] The name of the channel - # @param channel_options [Hash] Channel options, currently reserved for Encryption options - # @option channel_options [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options + # @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions} # def initialize(client, name, channel_options = {}) name = ensure_utf_8(:name, name) - update_options channel_options + @options = Ably::Models::ChannelOptions(channel_options) @client = client @name = name @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) @push = PushChannel.new(self) @properties = ChannelProperties.new(self) + @attach_resume = false setup_event_handlers setup_presence end @@ -127,27 +137,35 @@ # # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks # # @example - # # Publish a single message + # # Publish a single message form # channel.publish 'click', { x: 1, y: 2 } # - # # Publish an array of message Hashes + # # Publish a single message with single Hash form + # message = { name: 'click', data: { x: 1, y: 2 } } + # channel.publish message + # + # # Publish an array of message Hashes form # messages = [ - # { name: 'click', { x: 1, y: 2 } }, - # { name: 'click', { x: 2, y: 3 } } + # { name: 'click', data: { x: 1, y: 2 } }, + # { name: 'click', data: { x: 2, y: 3 } } # ] # channel.publish messages # - # # Publish an array of Ably::Models::Message objects + # # Publish an array of Ably::Models::Message objects form # messages = [ - # Ably::Models::Message(name: 'click', { x: 1, y: 2 }) - # Ably::Models::Message(name: 'click', { x: 2, y: 3 }) + # Ably::Models::Message(name: 'click', data: { x: 1, y: 2 }) + # Ably::Models::Message(name: 'click', data: { x: 2, y: 3 }) # ] # channel.publish messages # + # # Publish an array of Ably::Models::Message objects form + # message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 }) + # channel.publish message + # # channel.publish('click', 'body') do |message| # puts "#{message.name} event received with #{message.data}" # end # # channel.publish('click', 'body').errback do |error, message| @@ -163,17 +181,11 @@ if !connection.can_publish_messages? error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end - messages = if name.kind_of?(Enumerable) - name - else - name = ensure_utf_8(:name, name, allow_nil: true) - ensure_supported_payload data - [{ name: name, data: data }.merge(attributes)] - end + messages = build_messages(name, data, attributes) # (RSL1a, RSL1b) if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end @@ -307,37 +319,55 @@ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: lambda { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end + # Sets or updates the stored channel options. (#RTL16) + # @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions} + # @return [Ably::Models::ChannelOptions] + def set_options(channel_options) + @options = Ably::Models::ChannelOptions(channel_options) + + manager.request_reattach if need_reattach? + end + alias options= set_options + # @api private def set_channel_error_reason(error) @error_reason = error end # @api private def clear_error_reason @error_reason = nil end - # @api private - def update_options(channel_options) - @options = channel_options.clone.freeze - end - alias set_options update_options # (RSL7) - alias options= update_options - # Used by {Ably::Modules::StateEmitter} to debug state changes # @api private def logger client.logger end + # @api private + def attach_resume! + @attach_resume = true + end + + # @api private + def reset_attach_resume! + @attach_resume = false + end + # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state + def need_reattach? + !!(attaching? || attached?) && !!(options.modes || options.params) + end + private + def setup_event_handlers __incoming_msgbus__.subscribe(:message) do |message| message.decode(client.encoders, options) do |encode_error, error_message| client.logger.error error_message end