Sha256: 8c3f7b783f4a2f3410641b702b5ee0bd62533ccd56a47608ed2bc9b0adfe77aa

Contents?: true

Size: 1.9 KB

Versions: 1

Compression:

Stored size: 1.9 KB

Contents

require 'singleton'
require 'forwardable'

module AsyncCable
  class Registry
    include Singleton
    extend SingleForwardable
    @mutex = Mutex.new

    single_delegate [:add, :remove, :find, :all] => :instance

    # Adds connection to registry.
    # @param channel_name [String]
    # @param stream_name [String]
    # @param connection [AsyncCable::Connection]
    def add(channel_name, stream_name, connection)
      @mutex.synchronize do
        subscribers[channel_name][stream_name].push(connection)
        connection
      end
    end

    # Removes connection from registry.
    # @param channel_name [String]
    # @param stream_name [String]
    # @param connection [AsyncCable::Connection]
    def remove(channel_name, stream_name, connection)
      @mutex.synchronize do
        subscribers[channel_name][stream_name].delete(connection)
        subscribers[channel_name].delete(stream_name) if subscribers[channel_name][stream_name].empty?
        connection
      end
    end

    # Return all connections from all channels when `channel_name` omitted.
    # Return all connections from channel when `stream_name` omitted.
    # Return connections from channel stream when `channel_name` and `stream_name` provided.
    # @param channel_name [String,NilClass]
    # @param stream_name [String,NilClass]
    # @return [Array<AsyncCable::Connection>,Array]
    def find(channel_name = nil, stream_name = nil)
      @mutex.synchronize do
        return subscribers.values.map(&:values).flatten if channel_name.nil?
        return subscribers[channel_name].values.flatten if stream_name.nil?
        subscribers[channel_name][stream_name]
      end
    end

    private

    def subscribers
      @subscribers ||= new_subscribers
    end

    def new_subscribers
      Hash.new do |hash, channel_name|
        hash[channel_name] = Hash.new { |h, stream_name| h[stream_name] = []; h }
        hash
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
async_cable-0.1.0 lib/async_cable/registry.rb