lib/celluloid/actor.rb in celluloid-0.0.3 vs lib/celluloid/actor.rb in celluloid-0.1.0

- old
+ new

@@ -1,5 +1,18 @@ +require 'thread' + +begin + require 'fiber' +rescue LoadError => ex + # If we're on Rubinius, we can still work in 1.8 mode + if defined? Rubinius + Fiber = Rubinius::Fiber + else + raise ex + end +end + module Celluloid # Raised when trying to do Actor-like things outside Actor-space class NotActorError < StandardError; end # Raised when we're asked to do something to a dead actor @@ -13,10 +26,23 @@ @cause = cause super "caused by #{cause.inspect}: #{cause.to_s}" end end + # Are we currently inside of an actor? + def self.actor? + !!Thread.current[:actor] + end + + # Obtain the currently running actor (if one exists) + def self.current_actor + actor = Thread.current[:actor_proxy] + raise NotActorError, "not in actor scope" unless actor + + actor + end + # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous # messages. The implementation is inspired by Erlang's gen_server module Actor attr_reader :mailbox @@ -31,10 +57,11 @@ actor = allocate proxy = actor.__start_actor proxy.send(:initialize, *args, &block) proxy end + alias_method :new, :spawn # Create a new actor and link to the current one def spawn_link(*args, &block) current_actor = Thread.current[:actor] raise NotActorError, "can't link outside actor context" unless current_actor @@ -45,10 +72,11 @@ current_actor.link actor proxy.send(:initialize, *args, &block) proxy end + alias_method :new_link, :spawn_link # Create a supervisor which ensures an instance of an actor will restart # an actor if it fails def supervise(*args, &block) Celluloid::Supervisor.supervise(self, *args, &block) @@ -61,32 +89,31 @@ end # Trap errors from actors we're linked to when they exit def trap_exit(callback) @exit_handler = callback.to_sym - end + end end # Instance methods added to the public API module InstanceMethods - # Is this object functioning as an actor? - def actor? - !!@mailbox - end - # Is this actor alive? def alive? @thread.alive? end # Raise an exception in caller context, but stay running def abort(cause) raise AbortError.new(cause) end + # Terminate this actor + def terminate + @running = false + end + def inspect - return super unless actor? str = "#<Celluloid::Actor(#{self.class}:0x#{self.object_id.to_s(16)})" ivars = [] instance_variables.each do |ivar| next if %w(@mailbox @links @proxy @thread).include? ivar.to_s @@ -103,70 +130,109 @@ # Actor-specific initialization and startup def __start_actor(*args, &block) @mailbox = Mailbox.new @links = Links.new @proxy = ActorProxy.new(self, @mailbox) + @running = true @thread = Thread.new do - Thread.current[:actor] = self - Thread.current[:mailbox] = @mailbox + __init_thread __run_actor end @proxy end + + # Configure thread locals for the running thread + def __init_thread + Thread.current[:actor] = self + Thread.current[:actor_proxy] = @proxy + Thread.current[:mailbox] = @mailbox + end # Run the actor def __run_actor __process_messages + __cleanup ExitEvent.new(@proxy) rescue Exception => ex __handle_crash(ex) + ensure + Thread.current.exit end # Process incoming messages def __process_messages - while true # instead of loop, for speed! + pending_calls = {} + + while @running begin - call = @mailbox.receive - rescue ExitEvent => event - __handle_exit(event) + message = @mailbox.receive + rescue ExitEvent => exit_event + fiber = Fiber.new do + __init_thread + __handle_exit_event exit_event + end + + call = fiber.resume + pending_calls[call] = fiber if fiber.alive? + retry end + + case message + when SyncCall + fiber = Fiber.new do + __init_thread + message.dispatch(self) + end - call.dispatch(self) + call = fiber.resume + pending_calls[call] = fiber if fiber.alive? + when Response + fiber = pending_calls.delete(message.call) + + if fiber + call = fiber.resume message + pending_calls[call] = fiber if fiber.alive? + end + when AsyncCall + message.dispatch(self) + end # unexpected messages are ignored end end - + # Handle exit events received by this actor - def __handle_exit(exit_event) + def __handle_exit_event(exit_event) exit_handler = self.class.exit_handler - raise exit_event.reason unless exit_handler + if exit_handler + return send(exit_handler, exit_event.actor, exit_event.reason) + end - send exit_handler, exit_event.actor, exit_event.reason + # Reraise exceptions from linked actors + # If no reason is given, actor terminated cleanly + raise exit_event.reason if exit_event.reason end # Handle any exceptions that occur within a running actor def __handle_crash(exception) __log_error(exception) - @mailbox.cleanup - - # Report the exit event to all actors we're linked to - exit_event = ExitEvent.new(@proxy, exception) - - # Propagate the error to all linked actors - @links.each do |actor| - actor.mailbox.system_event exit_event - end + __cleanup ExitEvent.new(@proxy, exception) rescue Exception => handler_exception - __log_error(handler_exception, "/!\\ EXCEPTION IN ERROR HANDLER /!\\") - ensure - Thread.current.exit + __log_error(handler_exception, "ERROR HANDLER CRASHED!") end + # Handle cleaning up this actor after it exits + def __cleanup(exit_event) + @mailbox.shutdown + @links.send_event exit_event + end + # Log errors when an actor crashes # FIXME: This should probably thunk to a real logger - def __log_error(ex, prefix = "!!! CRASH") - puts "#{prefix} #{self.class}: #{ex.class}: #{ex.to_s}\n#{ex.backtrace.join("\n")}" + def __log_error(ex, message = "#{self.class} crashed!") + message << "\n#{ex.class}: #{ex.to_s}\n" + message << ex.backtrace.join("\n") + Celluloid.logger.error message if Celluloid.logger end end def self.included(klass) klass.extend ClassMethods \ No newline at end of file