lib/celluloid/receivers.rb in celluloid-0.6.2 vs lib/celluloid/receivers.rb in celluloid-0.7.0

- old
+ new

@@ -1,36 +1,66 @@ +require 'set' + module Celluloid # Allow methods to directly interact with the actor protocol class Receivers def initialize - @handlers = [] + @receivers = Set.new + @timers = Timers.new end # Receive an asynchronous message - def receive(&block) - raise ArgumentError, "receive must be given a block" unless block + def receive(timeout = nil, &block) + receiver = Receiver.new block - @handlers << [Fiber.current, block] - Fiber.yield + if timeout + receiver.timer = @timers.add(timeout) do + @receivers.delete receiver + receiver.resume + end + end + + @receivers << receiver + Task.suspend end + # How long to wait until the next timer fires + def wait_interval + @timers.wait_interval + end + + # Fire any pending timers + def fire_timers + @timers.fire + end + # Handle incoming messages def handle_message(message) - handler = nil + receiver = @receivers.find { |r| r.match(message) } + return unless receiver - @handlers.each_with_index do |(fiber, block), index| - if block.call(message) - handler = index - break - end - end + @receivers.delete receiver + @timers.cancel receiver.timer if receiver.timer + receiver.resume message + end + end - if handler - fiber, _ = @handlers.delete_at handler - fiber.resume message - true - else - false - end + # Methods blocking on a call to receive + class Receiver + attr_accessor :timer + + def initialize(block) + @block = block + @task = Task.current + @timer = nil + end + + # Match a message with this receiver's block + def match(message) + @block.call(message) if @block + end + + def resume(message = nil) + @task.resume message end end end