Sha256: f77661867e03482fe50457a07ccad51d348e4546d62490aaf41b4eb8b3efface
Contents?: true
Size: 1.18 KB
Versions: 3
Compression:
Stored size: 1.18 KB
Contents
require 'concurrent/actor/context' module Concurrent module Actor module Utils # Distributes messages between subscribed actors. Each actor'll get only one message then # it's unsubscribed. The actor needs to resubscribe when it's ready to receive next message. # It will buffer the messages if there is no worker registered. # @see Pool class Balancer < RestartingContext def initialize @receivers = [] @buffer = [] end def on_message(message) command, who = message case command when :subscribe @receivers << (who || envelope.sender) distribute true when :unsubscribe @receivers.delete(who || envelope.sender) true when :subscribed? @receivers.include?(who || envelope.sender) else @buffer << envelope distribute Behaviour::MESSAGE_PROCESSED end end def distribute while !@receivers.empty? && !@buffer.empty? redirect @receivers.shift, @buffer.shift end end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems