lib/celluloid/actor.rb in celluloid-0.2.2 vs lib/celluloid/actor.rb in celluloid-0.5.0
- old
+ new
@@ -1,282 +1,188 @@
require 'thread'
begin
require 'fiber'
rescue LoadError => ex
- # If we're on Rubinius, we can still work in 1.8 mode
- if defined? Rubinius
+ if defined? JRUBY_VERSION
+ raise LoadError, "Celluloid requires JRuby 1.9 mode. Please pass the --1.9 flag or set JRUBY_OPTS=--1.9"
+ elsif defined? Rubinius
+ # If we're on Rubinius, we can still work in 1.8 mode
Fiber = Rubinius::Fiber
else
raise ex
end
end
module Celluloid
- # Raised when trying to do Actor-like things outside Actor-space
+ # Don't do Actor-like things outside Actor scope
class NotActorError < StandardError; end
-
- # Raised when we're asked to do something to a dead actor
+
+ # Trying to do something to a dead actor
class DeadActorError < StandardError; end
-
- # Raised when the caller makes an error that shouldn't crash this actor
+
+ # The caller made an error, not the current actor
class AbortError < StandardError
attr_reader :cause
-
+
def initialize(cause)
@cause = cause
super "caused by #{cause.inspect}: #{cause.to_s}"
end
end
-
- # Are we currently inside of an actor?
- def self.actor?
- !!Thread.current[:actor]
- end
-
- # Obtain the currently running actor (if one exists)
- def self.current_actor
- actor = Thread.current[:actor_proxy]
- raise NotActorError, "not in actor scope" unless actor
-
- actor
- end
-
+
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
- # messages. The implementation is inspired by Erlang's gen_server
- module Actor
- # Methods added to classes which include Celluloid
- module ClassMethods
- # Create a new actor
- def spawn(*args, &block)
- actor = allocate
- proxy = actor.__start_actor
- proxy.send(:initialize, *args, &block)
- proxy
- end
- alias_method :new, :spawn
-
- # Create a new actor and link to the current one
- def spawn_link(*args, &block)
- current_actor = Thread.current[:actor]
- raise NotActorError, "can't link outside actor context" unless current_actor
+ # messages.
+ class Actor
+ extend Registry
+ include Linking
- actor = allocate
- proxy = actor.__start_actor
- current_actor.link actor
- proxy.send(:initialize, *args, &block)
-
- proxy
+ attr_reader :proxy
+ attr_reader :links
+ attr_reader :mailbox
+
+
+ # Wrap the given subject object with an Actor
+ def initialize(subject)
+ @subject = subject
+ @mailbox = initialize_mailbox
+ @links = Links.new
+ @signals = Signals.new
+ @proxy = ActorProxy.new(self, @mailbox)
+ @running = true
+
+ @thread = Pool.get
+ @thread[:queue] << proc do
+ initialize_thread_locals
+ run
end
- alias_method :new_link, :spawn_link
-
- # Create a supervisor which ensures an instance of an actor will restart
- # an actor if it fails
- def supervise(*args, &block)
- Celluloid::Supervisor.supervise(self, *args, &block)
- end
-
- # Create a supervisor which ensures an instance of an actor will restart
- # an actor if it fails, and keep the actor registered under a given name
- def supervise_as(name, *args, &block)
- Celluloid::Supervisor.supervise_as(name, self, *args, &block)
- end
-
- # Trap errors from actors we're linked to when they exit
- def trap_exit(callback)
- @_exit_handler = callback.to_sym
- end
-
- # Obtain the exit handler method for this class
- def exit_handler; @_exit_handler; end
end
-
- # Instance methods added to the public API
- module InstanceMethods
- # Obtain the mailbox of this actor
- def mailbox; @_mailbox; end
-
- # Is this actor alive?
- def alive?
- @_thread.alive?
- end
-
- # Raise an exception in caller context, but stay running
- def abort(cause)
- raise AbortError.new(cause)
- end
-
- # Terminate this actor
- def terminate
- @_running = false
- end
-
- def inspect
- str = "#<Celluloid::Actor(#{self.class}:0x#{self.object_id.to_s(16)})"
-
- ivars = []
- instance_variables.each do |ivar|
- ivar_name = ivar.to_s.sub(/^@_/, '')
- next if %w(mailbox links signals proxy thread running).include? ivar_name
- ivars << "#{ivar}=#{instance_variable_get(ivar).inspect}"
- end
-
- str << " " << ivars.join(' ') unless ivars.empty?
- str << ">"
- end
-
- #
- # Signals
- #
-
- # Send a signal with the given name to all waiting methods
- def signal(name, value = nil)
- @_signals.send name, value
- end
-
- # Wait for the given signal
- def wait(name)
- @_signals.wait name
- end
-
- #
- # Async calls
- #
-
- def method_missing(meth, *args, &block)
- # bang methods are async calls
- if meth.to_s.match(/!$/)
- unbanged_meth = meth.to_s.sub(/!$/, '')
- begin
- @_mailbox << AsyncCall.new(@_mailbox, unbanged_meth, args, block)
- rescue MailboxError
- # Silently swallow asynchronous calls to dead actors. There's no way
- # to reliably generate DeadActorErrors for async calls, so users of
- # async calls should find other ways to deal with actors dying
- # during an async call (i.e. linking/supervisors)
+ # Create the mailbox for this actor
+ #
+ # This implemententation is intended to be overridden in special-case
+ # subclasses of Celluloid::Actor which use a custom mailbox
+ def initialize_mailbox
+ Mailbox.new
+ end
+
+ # Configure thread locals for the running thread
+ def initialize_thread_locals
+ Thread.current[:actor] = self
+ Thread.current[:actor_proxy] = @proxy
+ Thread.current[:mailbox] = @mailbox
+ end
+
+ # Run the actor loop
+ def run
+ process_messages
+ cleanup ExitEvent.new(@proxy)
+ rescue Exception => ex
+ @running = false
+ handle_crash(ex)
+ ensure
+ Pool.put @thread
+ end
+
+ # Is this actor alive?
+ def alive?
+ @running
+ end
+
+ # Terminate this actor
+ def terminate
+ @running = false
+ nil
+ end
+
+ # Send a signal with the given name to all waiting methods
+ def signal(name, value = nil)
+ @signals.send name, value
+ end
+
+ # Wait for the given signal
+ def wait(name)
+ @signals.wait name
+ end
+
+ #######
+ private
+ #######
+
+ # Process incoming messages
+ def process_messages
+ pending_calls = {}
+
+ while @running
+ begin
+ message = @mailbox.receive
+ rescue MailboxShutdown
+ # If the mailbox detects shutdown, exit the actor
+ @running = false
+ rescue ExitEvent => exit_event
+ fiber = Fiber.new do
+ initialize_thread_locals
+ handle_exit_event exit_event
end
-
- return # casts are async and return immediately
+
+ call = fiber.resume
+ pending_calls[call] = fiber if fiber.alive?
+
+ retry
end
-
- super
- end
- end
-
- # Internal methods not intended as part of the public API
- module InternalMethods
- # Actor-specific initialization and startup
- def __start_actor(*args, &block)
- @_mailbox = Mailbox.new
- @_links = Links.new
- @_signals = Signals.new
- @_proxy = ActorProxy.new(self, @_mailbox)
- @_running = true
-
- @_thread = Thread.new do
- __init_thread
- __run_actor
- end
-
- @_proxy
- end
-
- # Configure thread locals for the running thread
- def __init_thread
- Thread.current[:actor] = self
- Thread.current[:actor_proxy] = @_proxy
- Thread.current[:mailbox] = @_mailbox
- end
-
- # Run the actor
- def __run_actor
- __process_messages
- __cleanup ExitEvent.new(@_proxy)
- rescue Exception => ex
- __handle_crash(ex)
- ensure
- Thread.current.exit
- end
-
- # Process incoming messages
- def __process_messages
- pending_calls = {}
-
- while @_running
- begin
- message = @_mailbox.receive
- rescue ExitEvent => exit_event
- fiber = Fiber.new do
- __init_thread
- __handle_exit_event exit_event
- end
-
- call = fiber.resume
- pending_calls[call] = fiber if fiber.alive?
-
- retry
+
+ case message
+ when Call
+ fiber = Fiber.new do
+ initialize_thread_locals
+ message.dispatch(@subject)
end
-
- case message
- when Call
- fiber = Fiber.new do
- __init_thread
- message.dispatch(self)
- end
-
- call = fiber.resume
+
+ call = fiber.resume
+ pending_calls[call] = fiber if fiber.alive?
+ when Response
+ fiber = pending_calls.delete(message.call)
+
+ if fiber
+ call = fiber.resume message
pending_calls[call] = fiber if fiber.alive?
- when Response
- fiber = pending_calls.delete(message.call)
-
- if fiber
- call = fiber.resume message
- pending_calls[call] = fiber if fiber.alive?
- end
- end # unexpected messages are ignored
- end
+ end
+ end # unexpected messages are ignored
end
-
- # Handle exit events received by this actor
- def __handle_exit_event(exit_event)
- exit_handler = self.class.exit_handler
- if exit_handler
- return send(exit_handler, exit_event.actor, exit_event.reason)
- end
-
- # Reraise exceptions from linked actors
- # If no reason is given, actor terminated cleanly
- raise exit_event.reason if exit_event.reason
+ end
+
+ # Handle exit events received by this actor
+ def handle_exit_event(exit_event)
+ klass = @subject.class
+ exit_handler = klass.exit_handler if klass.respond_to? :exit_handler
+ if exit_handler
+ return @subject.send(exit_handler, exit_event.actor, exit_event.reason)
end
-
- # Handle any exceptions that occur within a running actor
- def __handle_crash(exception)
- __log_error(exception)
- __cleanup ExitEvent.new(@_proxy, exception)
- rescue Exception => handler_exception
- __log_error(handler_exception, "ERROR HANDLER CRASHED!")
- end
-
- # Handle cleaning up this actor after it exits
- def __cleanup(exit_event)
- @_mailbox.shutdown
- @_links.send_event exit_event
- end
-
- # Log errors when an actor crashes
- def __log_error(ex, message = "#{self.class} crashed!")
- message << "\n#{ex.class}: #{ex.to_s}\n"
- message << ex.backtrace.join("\n")
- Celluloid.logger.error message if Celluloid.logger
- end
+
+ # Reraise exceptions from linked actors
+ # If no reason is given, actor terminated cleanly
+ raise exit_event.reason if exit_event.reason
end
-
- def self.included(klass)
- klass.extend ClassMethods
- klass.send :include, InstanceMethods
- klass.send :include, InternalMethods
- klass.send :include, Linking
+
+ # Handle any exceptions that occur within a running actor
+ def handle_crash(exception)
+ log_error(exception)
+ cleanup ExitEvent.new(@proxy, exception)
+ rescue Exception => handler_exception
+ log_error(handler_exception, "ERROR HANDLER CRASHED!")
end
+
+ # Handle cleaning up this actor after it exits
+ def cleanup(exit_event)
+ @mailbox.shutdown
+ @links.send_event exit_event
+ end
+
+ # Log errors when an actor crashes
+ def log_error(ex, message = "#{@subject.class} crashed!")
+ message << "\n#{ex.class}: #{ex.to_s}\n"
+ message << ex.backtrace.join("\n")
+ Celluloid.logger.error message if Celluloid.logger
+ end
end
-end
\ No newline at end of file
+end