lib/celluloid/actor.rb in celluloid-0.16.0.pre vs lib/celluloid/actor.rb in celluloid-0.16.0.pre2

- old
+ new

@@ -1,5 +1,6 @@ + require 'timers' module Celluloid # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous @@ -110,11 +111,11 @@ @tasks = TaskSet.new @links = Links.new @signals = Signals.new @receivers = Receivers.new - @timers = Timers.new + @timers = Timers::Group.new @handlers = Handlers.new @running = false @name = nil handle(SystemEvent) do |message| @@ -168,34 +169,40 @@ end # Perform a linking request with another actor def linking_request(receiver, type) Celluloid.exclusive do - start_time = Time.now + linking_timeout = Timers::Timeout.new(LINKING_TIMEOUT) + receiver.mailbox << LinkingRequest.new(Actor.current, type) system_events = [] - loop do - wait_interval = start_time + LINKING_TIMEOUT - Time.now - message = @mailbox.receive(wait_interval) do |msg| - msg.is_a?(LinkingResponse) && - msg.actor.mailbox.address == receiver.mailbox.address && - msg.type == type + linking_timeout.while_time_remaining do |remaining| + begin + message = @mailbox.receive(remaining) do |msg| + msg.is_a?(LinkingResponse) && + msg.actor.mailbox.address == receiver.mailbox.address && + msg.type == type + end + rescue TimeoutError + next # IO reactor did something, no message in queue yet. end if message.instance_of? LinkingResponse Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING + # We're done! system_events.each { |ev| @mailbox << ev } + return - elsif message.instance_of? NilClass - raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded" - elsif message.instance_of? SystemEvent + elsif message.is_a? SystemEvent # Queue up pending system events to be processed after we've successfully linked system_events << message - else raise 'wtf' + else raise "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent." end end + + raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded" end end # Send a signal with the given name to all waiting methods def signal(name, value = nil)