Sha256: 71911bce0a8284023b4b5f532302620c915aafecf6dcc2564f463cd1d825ca53
Contents?: true
Size: 1.12 KB
Versions: 1
Compression:
Stored size: 1.12 KB
Contents
module Rasti module Web class Channel attr_reader :streams def initialize(id) @mutex = Mutex.new @channel = Restruct::Channel.new id: Restruct::Id[Web.channels_prefix][id] @streams = [] listen end def subscribe Stream.new.tap do |stream| mutex.synchronize do streams << stream end end end def publish(message) channel.publish message end def self.[](id) @channels ||= Hash.new { |h,k| h[k] = self.new id } @channels[id] end private attr_reader :mutex, :channel def listen Thread.new do channel.subscribe do |message| broadcast message end end end def broadcast(message) mutex.synchronize do streams.delete_if(&:closed?) Rasti::Web.logger.debug(Channel) { "Broadcasting (#{streams.count} connections) -> #{message}" } unless streams.empty? streams.each do |stream| stream.write message end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
rasti-web-0.2.0 | lib/rasti/web/channel.rb |