lib/ably/realtime/channel.rb in ably-1.0.7 vs lib/ably/realtime/channel.rb in ably-1.1.0

- old
+ new

@@ -1,5 +1,7 @@ +require 'ably/realtime/channel/publisher' + module Ably module Realtime # The Channel class represents a Channel belonging to this application. # The Channel instance allows messages to be published and # received, and controls the lifecycle of this instance's @@ -30,10 +32,11 @@ include Ably::Modules::Conversions include Ably::Modules::EventEmitter include Ably::Modules::EventMachineHelpers include Ably::Modules::AsyncWrapper include Ably::Modules::MessageEmitter + include Ably::Realtime::Channel::Publisher extend Ably::Modules::Enum # ChannelState # The permited states for this channel STATE = ruby_enum('STATE', @@ -59,16 +62,21 @@ # Max number of messages to bundle in a single ProtocolMessage MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50 # {Ably::Realtime::Client} associated with this channel # @return [Ably::Realtime::Client] + # @api private attr_reader :client # Channel name # @return [String] attr_reader :name + # Push channel used for push notification + # @return [Ably::Realtime::Channel::PushChannel] + attr_reader :push + # Channel options configured for this channel, see {#initialize} for channel_options # @return [Hash] attr_reader :options # When a channel failure occurs this attribute contains the Ably Exception @@ -101,10 +109,11 @@ @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) + @push = PushChannel.new(self) setup_event_handlers setup_presence end @@ -114,11 +123,11 @@ # # @param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs # @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument # @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string # - # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published + # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is published, or an Array of {Ably::Models::Message} when multiple messages are published # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks # # @example # # Publish a single message # channel.publish 'click', { x: 1, y: 2 } @@ -144,17 +153,17 @@ # channel.publish('click', 'body').errback do |error, message| # puts "#{message.name} was not received, error #{error.message}" # end # def publish(name, data = nil, attributes = {}, &success_block) - if detached? || detaching? || failed? + if suspended? || failed? error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end if !connection.can_publish_messages? - error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") + error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end messages = if name.kind_of?(Enumerable) name @@ -162,11 +171,16 @@ name = ensure_utf_8(:name, name, allow_nil: true) ensure_supported_payload data [{ name: name, data: data }.merge(attributes)] end - queue_messages(messages).tap do |deferrable| + if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE + error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.") + return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) + end + + enqueue_messages_on_connection(client, messages, channel_name, options).tap do |deferrable| deferrable.callback(&success_block) if block_given? end end # Subscribe to messages matching providing event name, or all messages if event name not provided. @@ -319,16 +333,10 @@ # @api private def logger client.logger end - # Internal queue used for messages published that cannot yet be enqueued on the connection - # @api private - def __queue__ - @queue - end - # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private @@ -337,104 +345,12 @@ message.decode(client.encoders, options) do |encode_error, error_message| client.logger.error error_message end emit_message message.name, message end - - unsafe_on(STATE.Attached) do - process_queue - end end - # Queue messages and process queue if channel is attached. - # If channel is not yet attached, attempt to attach it before the message queue is processed. - # @return [Ably::Util::SafeDeferrable] - def queue_messages(raw_messages) - messages = Array(raw_messages).map do |raw_msg| - create_message(raw_msg).tap do |message| - next if message.client_id.nil? - if message.client_id == '*' - raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages') - end - if message.client_id && !message.client_id.kind_of?(String) - raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages') - end - unless client.auth.can_assume_client_id?(message.client_id) - raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'") - end - end - end - - __queue__.push(*messages) - - if attached? - process_queue - else - attach - end - - if messages.count == 1 - # A message is a Deferrable so, if publishing only one message, simply return that Deferrable - messages.first - else - deferrable_for_multiple_messages(messages) - end - end - - # A deferrable object that calls the success callback once all messages are delivered - # If any message fails, the errback is called immediately - # Only one callback or errback is ever called i.e. if a group of messages all fail, only once - # errback will be invoked - def deferrable_for_multiple_messages(messages) - expected_deliveries = messages.count - actual_deliveries = 0 - failed = false - - Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| - messages.each do |message| - message.callback do - next if failed - actual_deliveries += 1 - deferrable.succeed messages if actual_deliveries == expected_deliveries - end - message.errback do |error| - next if failed - failed = true - deferrable.fail error, message - end - end - end - end - - def messages_in_queue? - !__queue__.empty? - end - - # Move messages from Channel Queue into Outgoing Connection Queue - def process_queue - condition = -> { attached? && messages_in_queue? } - non_blocking_loop_while(condition) do - send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) - end - end - - def send_messages_within_protocol_message(messages) - connection.send_protocol_message( - action: Ably::Models::ProtocolMessage::ACTION.Message.to_i, - channel: name, - messages: messages - ) - end - - def create_message(message) - Ably::Models::Message(message.dup).tap do |msg| - msg.encode(client.encoders, options) do |encode_error, error_message| - client.logger.error error_message - end - end - end - def rest_channel client.rest_client.channel(name) end def connection @@ -442,11 +358,17 @@ end def setup_presence @presence ||= Presence.new(self) end + + # Alias useful for methods with a name argument + def channel_name + name + end end end end require 'ably/realtime/channel/channel_manager' require 'ably/realtime/channel/channel_state_machine' +require 'ably/realtime/channel/push_channel'