lib/celluloid/actor.rb in celluloid-0.6.0 vs lib/celluloid/actor.rb in celluloid-0.6.1

- old
+ new

@@ -18,20 +18,18 @@ # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous # messages. class Actor extend Registry - include Linking attr_reader :proxy attr_reader :links attr_reader :mailbox # Invoke a method on the given actor via its mailbox def self.call(mailbox, meth, *args, &block) - our_mailbox = Thread.current.mailbox - call = SyncCall.new(our_mailbox, meth, args, block) + call = SyncCall.new(Thread.mailbox, meth, args, block) begin mailbox << call rescue MailboxError raise DeadActorError, "attempted to call a dead actor" @@ -40,23 +38,22 @@ if Celluloid.actor? # Yield to the actor scheduler, which resumes us when we get a response response = Fiber.yield(call) else # Otherwise we're inside a normal thread, so block - response = our_mailbox.receive do |msg| - msg.is_a? Response and msg.call_id == call.id + response = Thread.mailbox.receive do |msg| + msg.respond_to?(:call_id) and msg.call_id == call.id end end response.value end # Invoke a method asynchronously on an actor via its mailbox def self.async(mailbox, meth, *args, &block) - our_mailbox = Thread.current.mailbox begin - mailbox << AsyncCall.new(our_mailbox, meth, args, block) + mailbox << AsyncCall.new(Thread.mailbox, meth, args, block) rescue MailboxError # Silently swallow asynchronous calls to dead actors. There's no way # to reliably generate DeadActorErrors for async calls, so users of # async calls should find other ways to deal with actors dying # during an async call (i.e. linking/supervisors) @@ -74,18 +71,17 @@ end @links = Links.new @signals = Signals.new @receivers = Receivers.new - @proxy = ActorProxy.new(@mailbox) + @proxy = ActorProxy.new(@mailbox, self.class.to_s) @running = true @pending_calls = {} @thread = Pool.get do - Thread.current[:actor] = self - Thread.current[:actor_proxy] = @proxy - Thread.current[:mailbox] = @mailbox + Thread.current[:actor] = self + Thread.current[:mailbox] = @mailbox run end end @@ -119,12 +115,11 @@ def run while @running begin message = @mailbox.receive rescue ExitEvent => exit_event - fiber = Celluloid.fiber { handle_exit_event exit_event; nil } - run_fiber fiber + Celluloid::Fiber.new { handle_exit_event exit_event; nil }.resume retry end handle_message message end @@ -138,32 +133,25 @@ handle_crash(ex) ensure Pool.put @thread end - # Run a method, handling when its Fiber is suspended - def run_fiber(fiber, value = nil) - result = fiber.resume value - if result.is_a? Celluloid::Call - @pending_calls[result.id] = fiber if fiber.alive? - elsif result - warning = "non-call returned from fiber: #{result.class}" - Celluloid.logger.debug warning if Celluloid.logger - end - nil + # Register a fiber waiting for the response to a Celluloid::Call + def register_fiber(call, fiber) + raise ArgumentError, "attempted to register a dead fiber" unless fiber.alive? + @pending_calls[call.id] = fiber end # Handle an incoming message def handle_message(message) case message when Call - fiber = Celluloid.fiber { message.dispatch(@subject); nil } - run_fiber fiber + Celluloid::Fiber.new { message.dispatch(@subject); nil }.resume when Response fiber = @pending_calls.delete(message.call_id) if fiber - run_fiber fiber, message + fiber.resume message else warning = "spurious response to call #{message.call_id}" Celluloid.logger.debug if Celluloid.logger end else