Sha256: 6b5f227cb478501271783375748be54a503e2e6f78758c8f469487885f72cc6e
Contents?: true
Size: 1.43 KB
Versions: 3
Compression:
Stored size: 1.43 KB
Contents
require 'set' require 'timers' module Celluloid module Internals # Allow methods to directly interact with the actor protocol class Receivers def initialize(timers) @receivers = Set.new @timers = timers end # Receive an asynchronous message def receive(timeout = nil, &block) if Celluloid.exclusive? Celluloid.mailbox.receive(timeout, &block) else receiver = Receiver.new block if timeout receiver.timer = @timers.after(timeout) do @receivers.delete receiver receiver.resume end end @receivers << receiver Task.suspend :receiving end end # Handle incoming messages def handle_message(message) receiver = @receivers.find { |r| r.match(message) } return unless receiver @receivers.delete receiver receiver.timer.cancel if receiver.timer receiver.resume message message end end # Methods blocking on a call to receive class Receiver attr_accessor :timer def initialize(block) @block = block @task = Task.current @timer = nil end # Match a message with this receiver's block def match(message) @block ? @block.call(message) : true end def resume(message = nil) @task.resume message end end end end
Version data entries
3 entries across 3 versions & 1 rubygems