lib/ably/realtime/channel.rb in ably-1.0.7 vs lib/ably/realtime/channel.rb in ably-1.1.0
- old
+ new
@@ -1,5 +1,7 @@
+require 'ably/realtime/channel/publisher'
+
module Ably
module Realtime
# The Channel class represents a Channel belonging to this application.
# The Channel instance allows messages to be published and
# received, and controls the lifecycle of this instance's
@@ -30,10 +32,11 @@
include Ably::Modules::Conversions
include Ably::Modules::EventEmitter
include Ably::Modules::EventMachineHelpers
include Ably::Modules::AsyncWrapper
include Ably::Modules::MessageEmitter
+ include Ably::Realtime::Channel::Publisher
extend Ably::Modules::Enum
# ChannelState
# The permited states for this channel
STATE = ruby_enum('STATE',
@@ -59,16 +62,21 @@
# Max number of messages to bundle in a single ProtocolMessage
MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50
# {Ably::Realtime::Client} associated with this channel
# @return [Ably::Realtime::Client]
+ # @api private
attr_reader :client
# Channel name
# @return [String]
attr_reader :name
+ # Push channel used for push notification
+ # @return [Ably::Realtime::Channel::PushChannel]
+ attr_reader :push
+
# Channel options configured for this channel, see {#initialize} for channel_options
# @return [Hash]
attr_reader :options
# When a channel failure occurs this attribute contains the Ably Exception
@@ -101,10 +109,11 @@
@queue = []
@state_machine = ChannelStateMachine.new(self)
@state = STATE(state_machine.current_state)
@manager = ChannelManager.new(self, client.connection)
+ @push = PushChannel.new(self)
setup_event_handlers
setup_presence
end
@@ -114,11 +123,11 @@
#
# @param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
# @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
# @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string
#
- # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published
+ # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
#
# @example
# # Publish a single message
# channel.publish 'click', { x: 1, y: 2 }
@@ -144,17 +153,17 @@
# channel.publish('click', 'body').errback do |error, message|
# puts "#{message.name} was not received, error #{error.message}"
# end
#
def publish(name, data = nil, attributes = {}, &success_block)
- if detached? || detaching? || failed?
+ if suspended? || failed?
error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
if !connection.can_publish_messages?
- error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
+ error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
end
messages = if name.kind_of?(Enumerable)
name
@@ -162,11 +171,16 @@
name = ensure_utf_8(:name, name, allow_nil: true)
ensure_supported_payload data
[{ name: name, data: data }.merge(attributes)]
end
- queue_messages(messages).tap do |deferrable|
+ if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
+ error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
+ return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
+ end
+
+ enqueue_messages_on_connection(client, messages, channel_name, options).tap do |deferrable|
deferrable.callback(&success_block) if block_given?
end
end
# Subscribe to messages matching providing event name, or all messages if event name not provided.
@@ -319,16 +333,10 @@
# @api private
def logger
client.logger
end
- # Internal queue used for messages published that cannot yet be enqueued on the connection
- # @api private
- def __queue__
- @queue
- end
-
# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state
private
@@ -337,104 +345,12 @@
message.decode(client.encoders, options) do |encode_error, error_message|
client.logger.error error_message
end
emit_message message.name, message
end
-
- unsafe_on(STATE.Attached) do
- process_queue
- end
end
- # Queue messages and process queue if channel is attached.
- # If channel is not yet attached, attempt to attach it before the message queue is processed.
- # @return [Ably::Util::SafeDeferrable]
- def queue_messages(raw_messages)
- messages = Array(raw_messages).map do |raw_msg|
- create_message(raw_msg).tap do |message|
- next if message.client_id.nil?
- if message.client_id == '*'
- raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages')
- end
- if message.client_id && !message.client_id.kind_of?(String)
- raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages')
- end
- unless client.auth.can_assume_client_id?(message.client_id)
- raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'")
- end
- end
- end
-
- __queue__.push(*messages)
-
- if attached?
- process_queue
- else
- attach
- end
-
- if messages.count == 1
- # A message is a Deferrable so, if publishing only one message, simply return that Deferrable
- messages.first
- else
- deferrable_for_multiple_messages(messages)
- end
- end
-
- # A deferrable object that calls the success callback once all messages are delivered
- # If any message fails, the errback is called immediately
- # Only one callback or errback is ever called i.e. if a group of messages all fail, only once
- # errback will be invoked
- def deferrable_for_multiple_messages(messages)
- expected_deliveries = messages.count
- actual_deliveries = 0
- failed = false
-
- Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
- messages.each do |message|
- message.callback do
- next if failed
- actual_deliveries += 1
- deferrable.succeed messages if actual_deliveries == expected_deliveries
- end
- message.errback do |error|
- next if failed
- failed = true
- deferrable.fail error, message
- end
- end
- end
- end
-
- def messages_in_queue?
- !__queue__.empty?
- end
-
- # Move messages from Channel Queue into Outgoing Connection Queue
- def process_queue
- condition = -> { attached? && messages_in_queue? }
- non_blocking_loop_while(condition) do
- send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)
- end
- end
-
- def send_messages_within_protocol_message(messages)
- connection.send_protocol_message(
- action: Ably::Models::ProtocolMessage::ACTION.Message.to_i,
- channel: name,
- messages: messages
- )
- end
-
- def create_message(message)
- Ably::Models::Message(message.dup).tap do |msg|
- msg.encode(client.encoders, options) do |encode_error, error_message|
- client.logger.error error_message
- end
- end
- end
-
def rest_channel
client.rest_client.channel(name)
end
def connection
@@ -442,11 +358,17 @@
end
def setup_presence
@presence ||= Presence.new(self)
end
+
+ # Alias useful for methods with a name argument
+ def channel_name
+ name
+ end
end
end
end
require 'ably/realtime/channel/channel_manager'
require 'ably/realtime/channel/channel_state_machine'
+require 'ably/realtime/channel/push_channel'