Sha256: 836b8d2dca7cb4b285c635ccd7632a43561c4bfa2d3d8f18742597405d25badf
Contents?: true
Size: 1.15 KB
Versions: 24
Compression:
Stored size: 1.15 KB
Contents
module Concurrent module Actor module Utils # Distributes messages between subscribed actors. Each actor'll get only one message then # it's unsubscribed. The actor needs to resubscribe when it's ready to receive next message. # It will buffer the messages if there is no worker registered. # @see Pool class Balancer < RestartingContext def initialize @receivers = [] @buffer = [] end def on_message(message) command, who = message case command when :subscribe @receivers << (who || envelope.sender) distribute true when :unsubscribe @receivers.delete(who || envelope.sender) true when :subscribed? @receivers.include?(who || envelope.sender) else @buffer << envelope distribute Behaviour::MESSAGE_PROCESSED end end def distribute while !@receivers.empty? && !@buffer.empty? redirect @receivers.shift, @buffer.shift end end end end end end
Version data entries
24 entries across 24 versions & 1 rubygems