Sha256: 7ea6f19903570683f0961f607b1c84ba05ce7e777af699136d7b1be730c7211f

Contents?: true

Size: 1.95 KB

Versions: 1

Compression:

Stored size: 1.95 KB

Contents

module Faye
  class Connection
    include EventMachine::Deferrable
    include Observable
    include Timeouts
    
    MAX_DELAY = 0.1
    INTERVAL  = 1.0
    TIMEOUT   = 60.0
    
    attr_reader :id   
    
    def initialize(id, options = {})
      @id        = id
      @options   = options
      @timeout   = @options[:timeout] || TIMEOUT
      @channels  = Set.new
      @inbox     = Set.new
      @connected = false
    end
    
    def update(message, event)
      return unless message == :message
      @inbox.add(event)
      begin_delivery_timeout!
    end
    
    def subscribe(channel)
      channel.add_observer(self) if @channels.add?(channel)
    end
    
    def unsubscribe(channel)
      return @channels.each(&method(:unsubscribe)) if channel == :all
      return unless @channels.member?(channel)
      @channels.delete(channel)
      channel.delete_observer(self)
    end
    
    def connect(&block)
      callback(&block)
      return if @connected
      
      @connected = true
      remove_timeout(:deletion)
      
      begin_delivery_timeout!
      begin_connection_timeout!
    end
    
    def flush!
      return unless @connected
      release_connection!
      
      events = @inbox.entries
      @inbox = Set.new
      
      set_deferred_status(:succeeded, events)
      set_deferred_status(:deferred)
    end
    
    def disconnect!
      unsubscribe(:all)
      flush!
    end
    
  private
    
    def begin_delivery_timeout!
      return unless @connected and not @inbox.empty?
      add_timeout(:delivery, MAX_DELAY) { flush! }
    end
    
    def begin_connection_timeout!
      return unless @connected
      add_timeout(:connection, @timeout) { flush! }
    end
    
    def release_connection!
      remove_timeout(:connection)
      remove_timeout(:delivery)
      @connected = false
      
      add_timeout(:deletion, 10 * INTERVAL) do
        changed(true)
        notify_observers(:stale_connection, self)
      end
    end
    
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
faye-0.3.3 lib/faye/connection.rb