lib/celluloid.rb in celluloid-0.13.0 vs lib/celluloid.rb in celluloid-0.14.0.pre

- old
+ new

@@ -23,10 +23,15 @@ # Are we currently inside of an actor? def actor? !!Thread.current[:celluloid_actor] end + # Retrieve the mailbox for the current thread or lazily initialize it + def mailbox + Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new + end + # Generate a Universally Unique Identifier def uuid UUID.generate end @@ -59,41 +64,61 @@ end # Launch default services # FIXME: We should set up the supervision hierarchy here def boot + internal_pool.reset Celluloid::Notifications::Fanout.supervise_as :notifications_fanout Celluloid::IncidentReporter.supervise_as :default_incident_reporter, STDERR end + def register_shutdown + return if @shutdown_registered + # Terminate all actors at exit + at_exit do + if defined?(RUBY_ENGINE) && RUBY_ENGINE == "ruby" && RUBY_VERSION >= "1.9" + # workaround for MRI bug losing exit status in at_exit block + # http://bugs.ruby-lang.org/issues/5218 + exit_status = $!.status if $!.is_a?(SystemExit) + Celluloid.shutdown + exit exit_status if exit_status + else + Celluloid.shutdown + end + end + @shutdown_registered = true + end + # Shut down all running actors def shutdown Timeout.timeout(shutdown_timeout) do + internal_pool.shutdown + actors = Actor.all Logger.debug "Terminating #{actors.size} actors..." if actors.size > 0 # Attempt to shut down the supervision tree, if available Supervisor.root.terminate if Supervisor.root # Actors cannot self-terminate, you must do it for them - Actor.all.each do |actor| + actors.each do |actor| begin actor.terminate! rescue DeadActorError, MailboxError end end - Actor.all.each do |actor| + actors.each do |actor| begin Actor.join(actor) rescue DeadActorError, MailboxError end end Logger.debug "Shutdown completed cleanly" end - rescue Timeout::Error => ex + rescue Timeout::Error Logger.error("Couldn't cleanly terminate all actors in #{shutdown_timeout} seconds!") end end # Class methods added to classes which include Celluloid @@ -172,17 +197,13 @@ end # Define the mailbox class for this class def mailbox_class(klass = nil) if klass - @mailbox_class = klass - elsif defined?(@mailbox_class) - @mailbox_class - elsif superclass.respond_to? :mailbox_class - superclass.mailbox_class + mailbox.class = klass else - Celluloid::Mailbox + mailbox.class end end def proxy_class(klass = nil) if klass @@ -211,36 +232,65 @@ # Mark methods as running exclusively def exclusive(*methods) if methods.empty? @exclusive_methods = :all - elsif @exclusive_methods != :all + elsif !defined?(@exclusive_methods) || @exclusive_methods != :all @exclusive_methods ||= Set.new @exclusive_methods.merge methods.map(&:to_sym) end end # Mark methods as running blocks on the receiver def execute_block_on_receiver(*methods) - # A noop method in preparation - # See https://github.com/celluloid/celluloid/pull/55 + receiver_block_executions.merge methods.map(&:to_sym) end + def receiver_block_executions + @receiver_block_executions ||= Set.new([:after, :every, :receive]) + end + # Configuration options for Actor#new def actor_options { - :mailbox => mailbox_class.new, + :mailbox => mailbox.build, :proxy_class => proxy_class, :task_class => task_class, :exit_handler => exit_handler, - :exclusive_methods => defined?(@exclusive_methods) ? @exclusive_methods : nil + :exclusive_methods => defined?(@exclusive_methods) ? @exclusive_methods : nil, + :receiver_block_executions => receiver_block_executions } end def ===(other) other.kind_of? self end + + def mailbox + @mailbox_factory ||= MailboxFactory.new(self) + end + + class MailboxFactory + attr_accessor :class, :max_size + + def initialize(actor) + @actor = actor + @class = nil + @max_size = nil + end + + def build + mailbox = mailbox_class.new + mailbox.max_size = @max_size + mailbox + end + + private + def mailbox_class + @class || (@actor.superclass.respond_to?(:mailbox_class) && @actor.superclass.mailbox_class) || Celluloid::Mailbox + end + end end # These are methods we don't want added to the Celluloid singleton but to be # defined on all classes that use Celluloid module InstanceMethods @@ -263,10 +313,20 @@ # Are we being invoked in a different thread from our owner? def leaked? @celluloid_owner != Thread.current[:celluloid_actor] end + def tap + yield current_actor + current_actor + end + + # Obtain the name of the current actor + def name + Actor.name + end + def inspect str = "#<" if leaked? str << Celluloid::BARE_OBJECT_WARNING_MESSAGE @@ -289,11 +349,11 @@ # # The following methods are available on both the Celluloid singleton and # directly inside of all classes that include Celluloid # - # Raise an exception in caller context, but stay running + # Raise an exception in sender context, but stay running def abort(cause) cause = case cause when String then RuntimeError.new(cause) when Exception then cause else raise TypeError, "Exception object/String expected, but #{cause.class} received" @@ -324,15 +384,10 @@ # Obtain the UUID of the current call chain def call_chain_id Thread.current[:celluloid_chain_id] end - # Obtain the name of the current actor - def name - Actor.name - end - # Obtain the running tasks for this actor def tasks Thread.current[:celluloid_actor].tasks.to_a end @@ -375,11 +430,11 @@ def receive(timeout = nil, &block) actor = Thread.current[:celluloid_actor] if actor actor.receive(timeout, &block) else - Thread.mailbox.receive(timeout, &block) + Celluloid.mailbox.receive(timeout, &block) end end # Sleep letting the actor continue processing messages def sleep(interval) @@ -412,49 +467,43 @@ def every(interval, &block) Thread.current[:celluloid_actor].every(interval, &block) end # Perform a blocking or computationally intensive action inside an - # asynchronous thread pool, allowing the caller to continue processing other + # asynchronous thread pool, allowing the sender to continue processing other # messages in its mailbox in the meantime def defer(&block) # This implementation relies on the present implementation of # Celluloid::Future, which uses a thread from InternalPool to run the block Future.new(&block).value end # Handle async calls within an actor itself def async(meth = nil, *args, &block) - if meth - Actor.async Thread.current[:celluloid_actor].mailbox, meth, *args, &block - else - Thread.current[:celluloid_actor].proxy.async - end + Thread.current[:celluloid_actor].proxy.async meth, *args, &block end # Handle calls to future within an actor itself def future(meth = nil, *args, &block) - if meth - Actor.future Thread.current[:celluloid_actor].mailbox, meth, *args, &block - else - Thread.current[:celluloid_actor].proxy.future - end + Thread.current[:celluloid_actor].proxy.future meth, *args, &block end end require 'celluloid/version' require 'celluloid/calls' require 'celluloid/condition' +require 'celluloid/thread' require 'celluloid/core_ext' require 'celluloid/cpu_counter' require 'celluloid/fiber' require 'celluloid/fsm' require 'celluloid/internal_pool' require 'celluloid/links' require 'celluloid/logger' require 'celluloid/mailbox' +require 'celluloid/evented_mailbox' require 'celluloid/method' require 'celluloid/receivers' require 'celluloid/registry' require 'celluloid/responses' require 'celluloid/signals' @@ -463,13 +512,15 @@ require 'celluloid/tasks' require 'celluloid/thread_handle' require 'celluloid/uuid' require 'celluloid/proxies/abstract_proxy' +require 'celluloid/proxies/sync_proxy' require 'celluloid/proxies/actor_proxy' require 'celluloid/proxies/async_proxy' require 'celluloid/proxies/future_proxy' +require 'celluloid/proxies/block_proxy' require 'celluloid/actor' require 'celluloid/future' require 'celluloid/pool_manager' require 'celluloid/supervision_group' @@ -481,5 +532,6 @@ # Configure default systemwide settings Celluloid.task_class = Celluloid::TaskFiber Celluloid.logger = Logger.new(STDERR) Celluloid.shutdown_timeout = 10 +Celluloid.register_shutdown