lib/ably/realtime/channel.rb in ably-1.1.8 vs lib/ably/realtime/channel.rb in ably-1.2.0
- old
+ new
@@ -34,10 +34,11 @@
include Ably::Modules::EventMachineHelpers
include Ably::Modules::AsyncWrapper
include Ably::Modules::MessageEmitter
include Ably::Realtime::Channel::Publisher
extend Ably::Modules::Enum
+ extend Forwardable
# ChannelState
# The permited states for this channel
STATE = ruby_enum('STATE',
:initialized,
@@ -90,30 +91,39 @@
# The Channel manager responsible for attaching, detaching and handling failures for this channel
# @return [Ably::Realtime::Channel::ChannelManager]
# @api private
attr_reader :manager
+ # Flag that specifies whether channel is resuming attachment(reattach) or is doing a 'clean attach' RTL4j1
+ # @return [Bolean]
+ # @api private
+ attr_reader :attach_resume
+
+ # ChannelOptions params attrribute (#RTL4k)
+ # return [Hash]
+ def_delegators :options, :params
+
# Initialize a new Channel object
#
# @param client [Ably::Rest::Client]
# @param name [String] The name of the channel
- # @param channel_options [Hash] Channel options, currently reserved for Encryption options
- # @option channel_options [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options
+ # @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
#
def initialize(client, name, channel_options = {})
name = ensure_utf_8(:name, name)
- update_options channel_options
+ @options = Ably::Models::ChannelOptions(channel_options)
@client = client
@name = name
@queue = []
@state_machine = ChannelStateMachine.new(self)
@state = STATE(state_machine.current_state)
@manager = ChannelManager.new(self, client.connection)
@push = PushChannel.new(self)
@properties = ChannelProperties.new(self)
+ @attach_resume = false
setup_event_handlers
setup_presence
end
@@ -127,27 +137,35 @@
#
# @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
# @example
- # # Publish a single message
+ # # Publish a single message form
# channel.publish 'click', { x: 1, y: 2 }
#
- # # Publish an array of message Hashes
+ # # Publish a single message with single Hash form
+ # message = { name: 'click', data: { x: 1, y: 2 } }
+ # channel.publish message
+ #
+ # # Publish an array of message Hashes form
# messages = [
- # { name: 'click', { x: 1, y: 2 } },
- # { name: 'click', { x: 2, y: 3 } }
+ # { name: 'click', data: { x: 1, y: 2 } },
+ # { name: 'click', data: { x: 2, y: 3 } }
# ]
# channel.publish messages
#
- # # Publish an array of Ably::Models::Message objects
+ # # Publish an array of Ably::Models::Message objects form
# messages = [
- # Ably::Models::Message(name: 'click', { x: 1, y: 2 })
- # Ably::Models::Message(name: 'click', { x: 2, y: 3 })
+ # Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
+ # Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
# ]
# channel.publish messages
#
+ # # Publish an array of Ably::Models::Message objects form
+ # message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
+ # channel.publish message
+ #
# channel.publish('click', 'body') do |message|
# puts "#{message.name} event received with #{message.data}"
# end
#
# channel.publish('click', 'body').errback do |error, message|
@@ -163,17 +181,11 @@
if !connection.can_publish_messages?
error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
- messages = if name.kind_of?(Enumerable)
- name
- else
- name = ensure_utf_8(:name, name, allow_nil: true)
- ensure_supported_payload data
- [{ name: name, data: data }.merge(attributes)]
- end
+ messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)
if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
@@ -307,37 +319,55 @@
@__incoming_msgbus__ ||= Ably::Util::PubSub.new(
coerce_into: lambda { |event| Ably::Models::ProtocolMessage::ACTION(event) }
)
end
+ # Sets or updates the stored channel options. (#RTL16)
+ # @param channel_options [Hash, Ably::Models::ChannelOptions] A hash of options or a {Ably::Models::ChannelOptions}
+ # @return [Ably::Models::ChannelOptions]
+ def set_options(channel_options)
+ @options = Ably::Models::ChannelOptions(channel_options)
+
+ manager.request_reattach if need_reattach?
+ end
+ alias options= set_options
+
# @api private
def set_channel_error_reason(error)
@error_reason = error
end
# @api private
def clear_error_reason
@error_reason = nil
end
- # @api private
- def update_options(channel_options)
- @options = channel_options.clone.freeze
- end
- alias set_options update_options # (RSL7)
- alias options= update_options
-
# Used by {Ably::Modules::StateEmitter} to debug state changes
# @api private
def logger
client.logger
end
+ # @api private
+ def attach_resume!
+ @attach_resume = true
+ end
+
+ # @api private
+ def reset_attach_resume!
+ @attach_resume = false
+ end
+
# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
+ def need_reattach?
+ !!(attaching? || attached?) && !!(options.modes || options.params)
+ end
+
private
+
def setup_event_handlers
__incoming_msgbus__.subscribe(:message) do |message|
message.decode(client.encoders, options) do |encode_error, error_message|
client.logger.error error_message
end