Sha256: 71911bce0a8284023b4b5f532302620c915aafecf6dcc2564f463cd1d825ca53

Contents?: true

Size: 1.12 KB

Versions: 1

Compression:

Stored size: 1.12 KB

Contents

module Rasti
  module Web
    class Channel

      attr_reader :streams

      def initialize(id)
        @mutex = Mutex.new
        @channel = Restruct::Channel.new id: Restruct::Id[Web.channels_prefix][id]
        @streams = []
        listen
      end

      def subscribe
        Stream.new.tap do |stream|
          mutex.synchronize do
            streams << stream
          end
        end
      end

      def publish(message)
        channel.publish message
      end

      def self.[](id)
        @channels ||= Hash.new { |h,k| h[k] = self.new id }
        @channels[id]
      end

      private

      attr_reader :mutex, :channel

      def listen
        Thread.new do
          channel.subscribe do |message|
            broadcast message
          end
        end
      end

      def broadcast(message)
        mutex.synchronize do
          streams.delete_if(&:closed?)
          Rasti::Web.logger.debug(Channel) { "Broadcasting (#{streams.count} connections) -> #{message}" } unless streams.empty?
          streams.each do |stream|
            stream.write message
          end
        end
      end
      
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rasti-web-0.2.0 lib/rasti/web/channel.rb