lib/celluloid/actor.rb in celluloid-0.16.0.pre vs lib/celluloid/actor.rb in celluloid-0.16.0.pre2
- old
+ new
@@ -1,5 +1,6 @@
+
require 'timers'
module Celluloid
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
@@ -110,11 +111,11 @@
@tasks = TaskSet.new
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
- @timers = Timers.new
+ @timers = Timers::Group.new
@handlers = Handlers.new
@running = false
@name = nil
handle(SystemEvent) do |message|
@@ -168,34 +169,40 @@
end
# Perform a linking request with another actor
def linking_request(receiver, type)
Celluloid.exclusive do
- start_time = Time.now
+ linking_timeout = Timers::Timeout.new(LINKING_TIMEOUT)
+
receiver.mailbox << LinkingRequest.new(Actor.current, type)
system_events = []
- loop do
- wait_interval = start_time + LINKING_TIMEOUT - Time.now
- message = @mailbox.receive(wait_interval) do |msg|
- msg.is_a?(LinkingResponse) &&
- msg.actor.mailbox.address == receiver.mailbox.address &&
- msg.type == type
+ linking_timeout.while_time_remaining do |remaining|
+ begin
+ message = @mailbox.receive(remaining) do |msg|
+ msg.is_a?(LinkingResponse) &&
+ msg.actor.mailbox.address == receiver.mailbox.address &&
+ msg.type == type
+ end
+ rescue TimeoutError
+ next # IO reactor did something, no message in queue yet.
end
if message.instance_of? LinkingResponse
Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING
+
# We're done!
system_events.each { |ev| @mailbox << ev }
+
return
- elsif message.instance_of? NilClass
- raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded"
- elsif message.instance_of? SystemEvent
+ elsif message.is_a? SystemEvent
# Queue up pending system events to be processed after we've successfully linked
system_events << message
- else raise 'wtf'
+ else raise "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent."
end
end
+
+ raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded"
end
end
# Send a signal with the given name to all waiting methods
def signal(name, value = nil)