lib/ably/realtime/channel.rb in ably-0.1.1 vs lib/ably/realtime/channel.rb in ably-0.1.2

- old
+ new

@@ -1,51 +1,95 @@ module Ably module Realtime class Channel include Callbacks + STATES = { + initialised: 1, + attaching: 2, + attached: 3, + detaching: 4, + detached: 5, + failed: 6 + }.freeze + attr_reader :client, :name + # Retrieve a state symbol by the integer value + def self.state_sym_for(state_int) + @states_index_by_int ||= STATES.invert.freeze + @states_index_by_int[state_int] + end + def initialize(client, name) - @state = :initialised - @client = client - @name = name - @subscriptions = Hash.new { |hash, key| hash[key] = [] } + @client = client + @name = name + @subscriptions = Hash.new { |hash, key| hash[key] = [] } + @queue = [] - on(:message) do |message| - event = message[:name] + set_state :initialised + on(:message) do |message| @subscriptions[:all].each { |cb| cb.call(message) } - @subscriptions[event].each { |cb| cb.call(message) } + @subscriptions[message.name].each { |cb| cb.call(message) } end + + on(:attached) do + set_state :attached + process_queue + end end + # Current Channel state, will always be one of {STATES} + # + # @return [Symbol] state + def state + self.class.state_sym_for(@state) + end + + def state?(check_state) + check_state = STATES.fetch(check_state) if check_state.kind_of?(Symbol) + @state == check_state + end + def publish(event, data) - message = { name: event, data: data } + queue << { name: event, data: data, timestamp: Time.now.to_i * 1000 } if attached? - client.send_message(name, message) + process_queue else - on(:attached) { client.send_message(name, message) } attach end end def subscribe(event = :all, &blk) + event = event.to_s unless event == :all + attach unless attached? @subscriptions[event] << blk end - private - def attached? - @state == :attached - end - def attach - unless @state == :attaching - @state = :attaching + unless state?(:attaching) + set_state :attaching client.attach_to_channel(name) - on(:attached) { @state = :attached } end + end + + def attached? + state?(:attached) + end + + private + attr_reader :queue + + def set_state(new_state) + new_state = STATES.fetch(new_state) if new_state.kind_of?(Symbol) + raise ArgumentError, "#{new_state} is not a valid state" unless STATES.values.include?(new_state) + @state = new_state + end + + def process_queue + client.send_messages(name, queue.shift(100)) until queue.empty? end end end end