lib/celluloid/actor.rb in celluloid-0.5.0 vs lib/celluloid/actor.rb in celluloid-0.6.0

- old
+ new

@@ -1,20 +1,5 @@ -require 'thread' - -begin - require 'fiber' -rescue LoadError => ex - if defined? JRUBY_VERSION - raise LoadError, "Celluloid requires JRuby 1.9 mode. Please pass the --1.9 flag or set JRUBY_OPTS=--1.9" - elsif defined? Rubinius - # If we're on Rubinius, we can still work in 1.8 mode - Fiber = Rubinius::Fiber - else - raise ex - end -end - module Celluloid # Don't do Actor-like things outside Actor scope class NotActorError < StandardError; end # Trying to do something to a dead actor @@ -39,51 +24,71 @@ attr_reader :proxy attr_reader :links attr_reader :mailbox + # Invoke a method on the given actor via its mailbox + def self.call(mailbox, meth, *args, &block) + our_mailbox = Thread.current.mailbox + call = SyncCall.new(our_mailbox, meth, args, block) - # Wrap the given subject object with an Actor - def initialize(subject) - @subject = subject - @mailbox = initialize_mailbox - @links = Links.new - @signals = Signals.new - @proxy = ActorProxy.new(self, @mailbox) - @running = true + begin + mailbox << call + rescue MailboxError + raise DeadActorError, "attempted to call a dead actor" + end - @thread = Pool.get - @thread[:queue] << proc do - initialize_thread_locals - run + if Celluloid.actor? + # Yield to the actor scheduler, which resumes us when we get a response + response = Fiber.yield(call) + else + # Otherwise we're inside a normal thread, so block + response = our_mailbox.receive do |msg| + msg.is_a? Response and msg.call_id == call.id + end end - end - # Create the mailbox for this actor - # - # This implemententation is intended to be overridden in special-case - # subclasses of Celluloid::Actor which use a custom mailbox - def initialize_mailbox - Mailbox.new + response.value end - # Configure thread locals for the running thread - def initialize_thread_locals - Thread.current[:actor] = self - Thread.current[:actor_proxy] = @proxy - Thread.current[:mailbox] = @mailbox + # Invoke a method asynchronously on an actor via its mailbox + def self.async(mailbox, meth, *args, &block) + our_mailbox = Thread.current.mailbox + begin + mailbox << AsyncCall.new(our_mailbox, meth, args, block) + rescue MailboxError + # Silently swallow asynchronous calls to dead actors. There's no way + # to reliably generate DeadActorErrors for async calls, so users of + # async calls should find other ways to deal with actors dying + # during an async call (i.e. linking/supervisors) + end end - # Run the actor loop - def run - process_messages - cleanup ExitEvent.new(@proxy) - rescue Exception => ex - @running = false - handle_crash(ex) - ensure - Pool.put @thread + # Wrap the given subject with an Actor + def initialize(subject) + @subject = subject + + if subject.respond_to? :mailbox_factory + @mailbox = subject.mailbox_factory + else + @mailbox = Celluloid::Mailbox.new + end + + @links = Links.new + @signals = Signals.new + @receivers = Receivers.new + @proxy = ActorProxy.new(@mailbox) + @running = true + @pending_calls = {} + + @thread = Pool.get do + Thread.current[:actor] = self + Thread.current[:actor_proxy] = @proxy + Thread.current[:mailbox] = @mailbox + + run + end end # Is this actor alive? def alive? @running @@ -103,60 +108,76 @@ # Wait for the given signal def wait(name) @signals.wait name end - ####### - private - ####### + # Receive an asynchronous message + def receive(&block) + @receivers.receive(&block) + end - # Process incoming messages - def process_messages - pending_calls = {} - + # Run the actor loop + def run while @running begin message = @mailbox.receive - rescue MailboxShutdown - # If the mailbox detects shutdown, exit the actor - @running = false rescue ExitEvent => exit_event - fiber = Fiber.new do - initialize_thread_locals - handle_exit_event exit_event - end - - call = fiber.resume - pending_calls[call] = fiber if fiber.alive? - + fiber = Celluloid.fiber { handle_exit_event exit_event; nil } + run_fiber fiber retry end - case message - when Call - fiber = Fiber.new do - initialize_thread_locals - message.dispatch(@subject) - end + handle_message message + end - call = fiber.resume - pending_calls[call] = fiber if fiber.alive? - when Response - fiber = pending_calls.delete(message.call) + cleanup ExitEvent.new(@proxy) + rescue MailboxShutdown + # If the mailbox detects shutdown, exit the actor + @running = false + rescue Exception => ex + @running = false + handle_crash(ex) + ensure + Pool.put @thread + end - if fiber - call = fiber.resume message - pending_calls[call] = fiber if fiber.alive? - end - end # unexpected messages are ignored + # Run a method, handling when its Fiber is suspended + def run_fiber(fiber, value = nil) + result = fiber.resume value + if result.is_a? Celluloid::Call + @pending_calls[result.id] = fiber if fiber.alive? + elsif result + warning = "non-call returned from fiber: #{result.class}" + Celluloid.logger.debug warning if Celluloid.logger end + nil end + # Handle an incoming message + def handle_message(message) + case message + when Call + fiber = Celluloid.fiber { message.dispatch(@subject); nil } + run_fiber fiber + when Response + fiber = @pending_calls.delete(message.call_id) + + if fiber + run_fiber fiber, message + else + warning = "spurious response to call #{message.call_id}" + Celluloid.logger.debug if Celluloid.logger + end + else + @receivers.handle_message(message) + end + message + end + # Handle exit events received by this actor def handle_exit_event(exit_event) - klass = @subject.class - exit_handler = klass.exit_handler if klass.respond_to? :exit_handler + exit_handler = @subject.class.exit_handler if exit_handler return @subject.send(exit_handler, exit_event.actor, exit_event.reason) end # Reraise exceptions from linked actors @@ -174,9 +195,15 @@ # Handle cleaning up this actor after it exits def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event + + begin + @subject.finalize if @subject.respond_to? :finalize + rescue Exception => finalizer_exception + log_error(finalizer_exception, "#{@subject.class}#finalize crashed!") + end end # Log errors when an actor crashes def log_error(ex, message = "#{@subject.class} crashed!") message << "\n#{ex.class}: #{ex.to_s}\n"