lib/ably/realtime/channel.rb in ably-0.7.2 vs lib/ably/realtime/channel.rb in ably-0.7.4
- old
+ new
@@ -31,10 +31,11 @@
class Channel
include Ably::Modules::Conversions
include Ably::Modules::EventEmitter
include Ably::Modules::EventMachineHelpers
include Ably::Modules::AsyncWrapper
+ include Ably::Modules::MessageEmitter
extend Ably::Modules::Enum
STATE = ruby_enum('STATE',
:initialized,
:attaching,
@@ -82,18 +83,18 @@
ensure_utf_8 :name, name
@client = client
@name = name
@options = channel_options.clone.freeze
- @subscriptions = Hash.new { |hash, key| hash[key] = [] }
@queue = []
@state_machine = ChannelStateMachine.new(self)
@state = STATE(state_machine.current_state)
@manager = ChannelManager.new(self, client.connection)
setup_event_handlers
+ setup_presence
end
# Publish a message on the channel
#
# @param name [String] The event name of the message
@@ -121,45 +122,37 @@
end
end
# Subscribe to messages matching providing event name, or all messages if event name not provided
#
- # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events.
+ # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events.
# @yield [Ably::Models::Message] For each message received, the block is called
#
# @return [void]
#
- def subscribe(name = :all, &callback)
+ def subscribe(*names, &callback)
attach unless attached? || attaching?
- subscriptions[message_name_key(name)] << callback
+ super
end
# Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided.
# If a block is not provided, all subscriptions will be unsubscribed
#
- # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events.
+ # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events.
#
# @return [void]
#
- def unsubscribe(name = :all, &callback)
- if message_name_key(name) == :all
- subscriptions.keys
- else
- Array(message_name_key(name))
- end.each do |key|
- subscriptions[key].delete_if do |block|
- !block_given? || callback == block
- end
- end
+ def unsubscribe(*names, &callback)
+ super
end
# Attach to this channel, and call the block if provided when attached.
# Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon
# to need to call attach explicitly.
#
# @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state
- # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callback
+ # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
#
def attach(&success_block)
transition_state_machine :attaching if can_transition_to?(:attaching)
deferrable_for_state_change_to(STATE.Attached, &success_block)
end
@@ -181,21 +174,21 @@
#
# @return {Ably::Realtime::Presence}
#
def presence
attach if initialized?
- @presence ||= Presence.new(self)
+ @presence
end
# Return the message history of the channel
#
# @param (see Ably::Rest::Channel#history)
# @option options (see Ably::Rest::Channel#history)
#
# @yield [Ably::Models::PaginatedResource<Ably::Models::Message>] An Array of {Ably::Models::Message} objects that supports paging (#next_page, #first_page)
#
- # @return [EventMachine::Deferrable]
+ # @return [Ably::Util::SafeDeferrable]
def history(options = {}, &callback)
async_wrap(callback) do
rest_channel.history(options.merge(async_blocking_operations: true))
end
end
@@ -212,25 +205,28 @@
# @api private
def set_failed_channel_error_reason(error)
@error_reason = error
end
+ # @api private
+ def clear_error_reason
+ @error_reason = nil
+ end
+
# Used by {Ably::Modules::StateEmitter} to debug state changes
# @api private
def logger
client.logger
end
private
- attr_reader :queue, :subscriptions
+ attr_reader :queue
def setup_event_handlers
__incoming_msgbus__.subscribe(:message) do |message|
message.decode self
-
- subscriptions[:all].each { |cb| cb.call(message) }
- subscriptions[message.name].each { |cb| cb.call(message) }
+ emit_message message.name, message
end
on(STATE.Attached) do
process_queue
end
@@ -271,11 +267,11 @@
def create_message(name, data)
message = { name: name }
message.merge!(data: data) unless data.nil?
message.merge!(clientId: client.client_id) if client.client_id
- Ably::Models::Message.new(message, nil).tap do |message|
+ Ably::Models::Message.new(message, logger: logger).tap do |message|
message.encode self
end
end
def rest_channel
@@ -284,15 +280,14 @@
def connection
client.connection
end
- def message_name_key(name)
- if name == :all
- :all
- else
- name.to_s
- end
+ def setup_presence
+ @presence ||= Presence.new(self)
end
end
end
end
+
+require 'ably/realtime/channel/channel_manager'
+require 'ably/realtime/channel/channel_state_machine'