Sha256: 0ebd386309add148dfd7e645a45a21d764a126fcade2e6866cb38e1dd45544f8
Contents?: true
Size: 1.24 KB
Versions: 51
Compression:
Stored size: 1.24 KB
Contents
require 'set' module Concurrent module Actor module Utils # Allows to build pub/sub easily. # @example news # news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news # # 2.times do |i| # Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do # news_channel << :subscribe # -> message { puts message } # end # end # # news_channel << 'Ruby rocks!' # # prints: 'Ruby rocks!' twice class Broadcast < RestartingContext def initialize @receivers = Set.new end def on_message(message) case message when :subscribe if envelope.sender.is_a? Reference @receivers.add envelope.sender true else false end when :unsubscribe !!@receivers.delete(envelope.sender) when :subscribed? @receivers.include? envelope.sender else filtered_receivers.each { |r| r << message } end end # override to define different behaviour, filtering etc def filtered_receivers @receivers end end end end end
Version data entries
51 entries across 51 versions & 2 rubygems