module Celluloid # Don't do Actor-like things outside Actor scope class NotActorError < StandardError; end # Trying to do something to a dead actor class DeadActorError < StandardError; end # The caller made an error, not the current actor class AbortError < StandardError attr_reader :cause def initialize(cause) @cause = cause super "caused by #{cause.inspect}: #{cause.to_s}" end end # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous # messages. class Actor extend Registry include Linking attr_reader :proxy attr_reader :links attr_reader :mailbox # Invoke a method on the given actor via its mailbox def self.call(mailbox, meth, *args, &block) our_mailbox = Thread.current.mailbox call = SyncCall.new(our_mailbox, meth, args, block) begin mailbox << call rescue MailboxError raise DeadActorError, "attempted to call a dead actor" end if Celluloid.actor? # Yield to the actor scheduler, which resumes us when we get a response response = Fiber.yield(call) else # Otherwise we're inside a normal thread, so block response = our_mailbox.receive do |msg| msg.is_a? Response and msg.call_id == call.id end end response.value end # Invoke a method asynchronously on an actor via its mailbox def self.async(mailbox, meth, *args, &block) our_mailbox = Thread.current.mailbox begin mailbox << AsyncCall.new(our_mailbox, meth, args, block) rescue MailboxError # Silently swallow asynchronous calls to dead actors. There's no way # to reliably generate DeadActorErrors for async calls, so users of # async calls should find other ways to deal with actors dying # during an async call (i.e. linking/supervisors) end end # Wrap the given subject with an Actor def initialize(subject) @subject = subject if subject.respond_to? :mailbox_factory @mailbox = subject.mailbox_factory else @mailbox = Celluloid::Mailbox.new end @links = Links.new @signals = Signals.new @receivers = Receivers.new @proxy = ActorProxy.new(@mailbox) @running = true @pending_calls = {} @thread = Pool.get do Thread.current[:actor] = self Thread.current[:actor_proxy] = @proxy Thread.current[:mailbox] = @mailbox run end end # Is this actor alive? def alive? @running end # Terminate this actor def terminate @running = false nil end # Send a signal with the given name to all waiting methods def signal(name, value = nil) @signals.send name, value end # Wait for the given signal def wait(name) @signals.wait name end # Receive an asynchronous message def receive(&block) @receivers.receive(&block) end # Run the actor loop def run while @running begin message = @mailbox.receive rescue ExitEvent => exit_event fiber = Celluloid.fiber { handle_exit_event exit_event; nil } run_fiber fiber retry end handle_message message end cleanup ExitEvent.new(@proxy) rescue MailboxShutdown # If the mailbox detects shutdown, exit the actor @running = false rescue Exception => ex @running = false handle_crash(ex) ensure Pool.put @thread end # Run a method, handling when its Fiber is suspended def run_fiber(fiber, value = nil) result = fiber.resume value if result.is_a? Celluloid::Call @pending_calls[result.id] = fiber if fiber.alive? elsif result warning = "non-call returned from fiber: #{result.class}" Celluloid.logger.debug warning if Celluloid.logger end nil end # Handle an incoming message def handle_message(message) case message when Call fiber = Celluloid.fiber { message.dispatch(@subject); nil } run_fiber fiber when Response fiber = @pending_calls.delete(message.call_id) if fiber run_fiber fiber, message else warning = "spurious response to call #{message.call_id}" Celluloid.logger.debug if Celluloid.logger end else @receivers.handle_message(message) end message end # Handle exit events received by this actor def handle_exit_event(exit_event) exit_handler = @subject.class.exit_handler if exit_handler return @subject.send(exit_handler, exit_event.actor, exit_event.reason) end # Reraise exceptions from linked actors # If no reason is given, actor terminated cleanly raise exit_event.reason if exit_event.reason end # Handle any exceptions that occur within a running actor def handle_crash(exception) log_error(exception) cleanup ExitEvent.new(@proxy, exception) rescue Exception => handler_exception log_error(handler_exception, "ERROR HANDLER CRASHED!") end # Handle cleaning up this actor after it exits def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event begin @subject.finalize if @subject.respond_to? :finalize rescue Exception => finalizer_exception log_error(finalizer_exception, "#{@subject.class}#finalize crashed!") end end # Log errors when an actor crashes def log_error(ex, message = "#{@subject.class} crashed!") message << "\n#{ex.class}: #{ex.to_s}\n" message << ex.backtrace.join("\n") Celluloid.logger.error message if Celluloid.logger end end end