lib/ably/realtime/channel.rb in ably-0.1.5 vs lib/ably/realtime/channel.rb in ably-0.1.6

- old
+ new

@@ -38,77 +38,166 @@ :attached, :detaching, :detached, :failed ) - include Ably::Modules::State + include Ably::Modules::StateEmitter # Max number of messages to bundle in a single ProtocolMessage MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50 - attr_reader :client, :name + attr_reader :client, :name, :options - def initialize(client, name) + # Initialize a new Channel object + # + # @param client [Ably::Rest::Client] + # @param name [String] The name of the channel + # @param channel_options [Hash] Channel options, currently reserved for future Encryption options + def initialize(client, name, channel_options = {}) @client = client @name = name + @options = channel_options.clone.freeze @subscriptions = Hash.new { |hash, key| hash[key] = [] } @queue = [] @state = STATE.Initialized setup_event_handlers end # Publish a message on the channel # - # @param event [String] The event name of the message + # @param name [String] The event name of the message # @param data [String,ByteArray] payload for the message - # @yield [Ably::Realtime::Models::Message] On success, will call the block with the {Ably::Realtime::Models::Message} - # @return [Ably::Realtime::Models::Message] + # @yield [Ably::Models::Message] On success, will call the block with the {Ably::Models::Message} + # @return [Ably::Models::Message] Deferrable {Ably::Models::Message} that supports both success (callback) and failure (errback) callbacks # - def publish(event, data, &callback) - Models::Message.new({ - name: event, - data: data, - timestamp: as_since_epoch(Time.now), - client_id: client.client_id - }, nil).tap do |message| + # @example + # channel.publish('click', 'body') + # + # channel.publish('click', 'body') do |message| + # puts "#{message.name} event received with #{message.data}" + # end + # + # channel.publish('click', 'body').errback do |message, error| + # puts "#{message.name} was not received, error #{error.message}" + # end + # + def publish(name, data, &callback) + create_message(name, data).tap do |message| message.callback(&callback) if block_given? queue_message message end end - def subscribe(event = :all, &blk) - event = event.to_s unless event == :all + # Subscribe to messages matching providing event name, or all messages if event name not provided + # + # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events. + # @yield [Ably::Models::Message] For each message received, the block is called + # + def subscribe(name = :all, &blk) attach unless attached? || attaching? - @subscriptions[event] << blk + subscriptions[message_name_key(name)] << blk end - def attach - unless attached? || attaching? - change_state STATE.Attaching - send_attach_protocol_message + # Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. + # If a block is not provided, all subscriptions will be unsubscribed + # + # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events. + # + def unsubscribe(name = :all, &blk) + if message_name_key(name) == :all + subscriptions.keys + else + Array(message_name_key(name)) + end.each do |key| + subscriptions[key].delete_if do |block| + !block_given? || blk == block + end end end - def __incoming_protocol_msgbus__ - @__incoming_protocol_msgbus__ ||= Ably::Util::PubSub.new( + # Attach to this channel, and call the block if provided when attached. + # Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon + # to need to call attach explicitly. + # + # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state + # + def attach(&block) + if attached? + block.call self if block_given? + else + once(STATE.Attached) { block.call self } if block_given? + if !attaching? + change_state STATE.Attaching + send_attach_protocol_message + end + end + end + + # Detach this channel, and call the block if provided when in a Detached or Failed state + # + # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state + # + def detach(&block) + if detached? || failed? + block.call self if block_given? + else + once(STATE.Detached, STATE.Failed) { block.call self } if block_given? + if !detaching? + change_state STATE.Detaching + send_detach_protocol_message + end + end + end + + # Presence object for this Channel. This controls this client's + # presence on the channel and may also be used to obtain presence information + # and change events for other members of the channel. + # + # @return {Ably::Realtime::Presence} + # + def presence + @presence ||= Presence.new(self) + end + + # Return the message history of the channel + # + # @param (see Ably::Rest::Channel#history) + # @option options (see Ably::Rest::Channel#history) + def history(options = {}) + rest_channel.history(options) + end + + # @!attribute [r] __incoming_msgbus__ + # @return [Ably::Util::PubSub] Client library internal channel incoming message bus + # @api private + def __incoming_msgbus__ + @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Models::ProtocolMessage::ACTION(event) } ) end private - attr_reader :queue + attr_reader :queue, :subscriptions def setup_event_handlers - __incoming_protocol_msgbus__.subscribe(:message) do |message| - @subscriptions[:all].each { |cb| cb.call(message) } - @subscriptions[message.name].each { |cb| cb.call(message) } + __incoming_msgbus__.subscribe(:message) do |message| + subscriptions[:all].each { |cb| cb.call(message) } + subscriptions[message.name].each { |cb| cb.call(message) } end - on(:attached) do + on(STATE.Attached) do process_queue end + + connection.on(Connection::STATE.Closed) do + change_state STATE.Detached + end + + connection.on(Connection::STATE.Failed) do + change_state STATE.Failed unless detached? || initialized? + end end # Queue message and process queue if channel is attached. # If channel is not yet attached, attempt to attach it before the message queue is processed. def queue_message(message) @@ -127,11 +216,11 @@ # Move messages from Channel Queue into Outgoing Connection Queue def process_queue condition = -> { attached? && messages_in_queue? } non_blocking_loop_while(condition) do - send_messages_within_protocol_message(queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)) + send_messages_within_protocol_message queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) end end def send_messages_within_protocol_message(messages) client.connection.send_protocol_message( @@ -140,18 +229,52 @@ messages: messages ) end def send_attach_protocol_message + send_state_change_protocol_message Models::ProtocolMessage::ACTION.Attach + end + + def send_detach_protocol_message + send_state_change_protocol_message Models::ProtocolMessage::ACTION.Detach + end + + def send_state_change_protocol_message(state) client.connection.send_protocol_message( - action: Models::ProtocolMessage::ACTION.Attach.to_i, + action: state.to_i, channel: name ) end - # Used by {Ably::Modules::State} to debug state changes + def create_message(name, data) + model = { + name: name, + data: data + } + model.merge!(clientId: client.client_id) if client.client_id + + Models::Message.new(model, nil) + end + + def rest_channel + client.rest_client.channel(name) + end + + # Used by {Ably::Modules::StateEmitter} to debug state changes def logger client.logger + end + + def connection + client.connection + end + + def message_name_key(name) + if name == :all + :all + else + name.to_s + end end end end end