Sha256: e95a98f566b7437fffb2eae57e30177243c9f07f87e6a55224e84081f8711d96

Contents?: true

Size: 1.03 KB

Versions: 1

Compression:

Stored size: 1.03 KB

Contents

module Rasti
  module Web

    class Channel

      attr_reader :id

      def initialize(id)
        @id = id
        @subscriptions = {}
        @mutex = Mutex.new
      end

      def subscribe
        stream = Stream.new
        
        subscription_id = broadcaster.subscribe id do |message|
          if stream.open?
            stream.write message
          else
            sid = mutex.synchronize { subscriptions.delete stream }
            broadcaster.unsubscribe sid
          end
        end

        mutex.synchronize { subscriptions[stream] = subscription_id }

        stream
      end

      def publish(message)
        broadcaster.publish id, message
      end

      def self.broadcaster
        @broadcaster ||= Broadcaster.new id: Web.channels_prefix, logger: Web.logger
      end

      def self.[](id)
        @channels ||= Hash.new { |h,k| h[k] = self.new k }
        @channels[id]
      end

      private

      attr_reader :subscriptions, :mutex

      def broadcaster
        self.class.broadcaster
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rasti-web-0.2.3 lib/rasti/web/channel.rb