lib/celluloid/actor.rb in celluloid-0.0.3 vs lib/celluloid/actor.rb in celluloid-0.1.0
- old
+ new
@@ -1,5 +1,18 @@
+require 'thread'
+
+begin
+ require 'fiber'
+rescue LoadError => ex
+ # If we're on Rubinius, we can still work in 1.8 mode
+ if defined? Rubinius
+ Fiber = Rubinius::Fiber
+ else
+ raise ex
+ end
+end
+
module Celluloid
# Raised when trying to do Actor-like things outside Actor-space
class NotActorError < StandardError; end
# Raised when we're asked to do something to a dead actor
@@ -13,10 +26,23 @@
@cause = cause
super "caused by #{cause.inspect}: #{cause.to_s}"
end
end
+ # Are we currently inside of an actor?
+ def self.actor?
+ !!Thread.current[:actor]
+ end
+
+ # Obtain the currently running actor (if one exists)
+ def self.current_actor
+ actor = Thread.current[:actor_proxy]
+ raise NotActorError, "not in actor scope" unless actor
+
+ actor
+ end
+
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
# messages. The implementation is inspired by Erlang's gen_server
module Actor
attr_reader :mailbox
@@ -31,10 +57,11 @@
actor = allocate
proxy = actor.__start_actor
proxy.send(:initialize, *args, &block)
proxy
end
+ alias_method :new, :spawn
# Create a new actor and link to the current one
def spawn_link(*args, &block)
current_actor = Thread.current[:actor]
raise NotActorError, "can't link outside actor context" unless current_actor
@@ -45,10 +72,11 @@
current_actor.link actor
proxy.send(:initialize, *args, &block)
proxy
end
+ alias_method :new_link, :spawn_link
# Create a supervisor which ensures an instance of an actor will restart
# an actor if it fails
def supervise(*args, &block)
Celluloid::Supervisor.supervise(self, *args, &block)
@@ -61,32 +89,31 @@
end
# Trap errors from actors we're linked to when they exit
def trap_exit(callback)
@exit_handler = callback.to_sym
- end
+ end
end
# Instance methods added to the public API
module InstanceMethods
- # Is this object functioning as an actor?
- def actor?
- !!@mailbox
- end
-
# Is this actor alive?
def alive?
@thread.alive?
end
# Raise an exception in caller context, but stay running
def abort(cause)
raise AbortError.new(cause)
end
+ # Terminate this actor
+ def terminate
+ @running = false
+ end
+
def inspect
- return super unless actor?
str = "#<Celluloid::Actor(#{self.class}:0x#{self.object_id.to_s(16)})"
ivars = []
instance_variables.each do |ivar|
next if %w(@mailbox @links @proxy @thread).include? ivar.to_s
@@ -103,70 +130,109 @@
# Actor-specific initialization and startup
def __start_actor(*args, &block)
@mailbox = Mailbox.new
@links = Links.new
@proxy = ActorProxy.new(self, @mailbox)
+ @running = true
@thread = Thread.new do
- Thread.current[:actor] = self
- Thread.current[:mailbox] = @mailbox
+ __init_thread
__run_actor
end
@proxy
end
+
+ # Configure thread locals for the running thread
+ def __init_thread
+ Thread.current[:actor] = self
+ Thread.current[:actor_proxy] = @proxy
+ Thread.current[:mailbox] = @mailbox
+ end
# Run the actor
def __run_actor
__process_messages
+ __cleanup ExitEvent.new(@proxy)
rescue Exception => ex
__handle_crash(ex)
+ ensure
+ Thread.current.exit
end
# Process incoming messages
def __process_messages
- while true # instead of loop, for speed!
+ pending_calls = {}
+
+ while @running
begin
- call = @mailbox.receive
- rescue ExitEvent => event
- __handle_exit(event)
+ message = @mailbox.receive
+ rescue ExitEvent => exit_event
+ fiber = Fiber.new do
+ __init_thread
+ __handle_exit_event exit_event
+ end
+
+ call = fiber.resume
+ pending_calls[call] = fiber if fiber.alive?
+
retry
end
+
+ case message
+ when SyncCall
+ fiber = Fiber.new do
+ __init_thread
+ message.dispatch(self)
+ end
- call.dispatch(self)
+ call = fiber.resume
+ pending_calls[call] = fiber if fiber.alive?
+ when Response
+ fiber = pending_calls.delete(message.call)
+
+ if fiber
+ call = fiber.resume message
+ pending_calls[call] = fiber if fiber.alive?
+ end
+ when AsyncCall
+ message.dispatch(self)
+ end # unexpected messages are ignored
end
end
-
+
# Handle exit events received by this actor
- def __handle_exit(exit_event)
+ def __handle_exit_event(exit_event)
exit_handler = self.class.exit_handler
- raise exit_event.reason unless exit_handler
+ if exit_handler
+ return send(exit_handler, exit_event.actor, exit_event.reason)
+ end
- send exit_handler, exit_event.actor, exit_event.reason
+ # Reraise exceptions from linked actors
+ # If no reason is given, actor terminated cleanly
+ raise exit_event.reason if exit_event.reason
end
# Handle any exceptions that occur within a running actor
def __handle_crash(exception)
__log_error(exception)
- @mailbox.cleanup
-
- # Report the exit event to all actors we're linked to
- exit_event = ExitEvent.new(@proxy, exception)
-
- # Propagate the error to all linked actors
- @links.each do |actor|
- actor.mailbox.system_event exit_event
- end
+ __cleanup ExitEvent.new(@proxy, exception)
rescue Exception => handler_exception
- __log_error(handler_exception, "/!\\ EXCEPTION IN ERROR HANDLER /!\\")
- ensure
- Thread.current.exit
+ __log_error(handler_exception, "ERROR HANDLER CRASHED!")
end
+ # Handle cleaning up this actor after it exits
+ def __cleanup(exit_event)
+ @mailbox.shutdown
+ @links.send_event exit_event
+ end
+
# Log errors when an actor crashes
# FIXME: This should probably thunk to a real logger
- def __log_error(ex, prefix = "!!! CRASH")
- puts "#{prefix} #{self.class}: #{ex.class}: #{ex.to_s}\n#{ex.backtrace.join("\n")}"
+ def __log_error(ex, message = "#{self.class} crashed!")
+ message << "\n#{ex.class}: #{ex.to_s}\n"
+ message << ex.backtrace.join("\n")
+ Celluloid.logger.error message if Celluloid.logger
end
end
def self.included(klass)
klass.extend ClassMethods
\ No newline at end of file