lib/celluloid.rb in celluloid-0.14.1 vs lib/celluloid.rb in celluloid-0.15.0.pre

- old
+ new

@@ -1,11 +1,18 @@ require 'logger' require 'thread' require 'timeout' require 'set' +if defined?(JRUBY_VERSION) && JRUBY_VERSION == "1.7.3" + raise "Celluloid is broken on JRuby 1.7.3. Please upgrade to 1.7.4+" +end + module Celluloid + VERSION = '0.15.0.pre' + Error = Class.new StandardError + extend self # expose all instance methods as singleton methods # Warning message added to Celluloid objects accessed outside their actors BARE_OBJECT_WARNING_MESSAGE = "WARNING: BARE CELLULOID OBJECT " @@ -16,10 +23,28 @@ attr_accessor :shutdown_timeout # How long actors have to terminate def included(klass) klass.send :extend, ClassMethods klass.send :include, InstanceMethods + + klass.send :extend, Properties + + klass.property :mailbox_class, :default => Celluloid::Mailbox + klass.property :proxy_class, :default => Celluloid::ActorProxy + klass.property :task_class, :default => Celluloid.task_class + klass.property :mailbox_size + + klass.property :execute_block_on_receiver, + :default => [:after, :every, :receive], + :multi => true + + klass.property :finalizer + klass.property :exit_handler + + klass.send(:define_singleton_method, :trap_exit) do |*args| + exit_handler(*args) + end end # Are we currently inside of an actor? def actor? !!Thread.current[:celluloid_actor] @@ -46,10 +71,22 @@ def stack_dump(output = STDERR) Celluloid::StackDump.new.dump(output) end alias_method :dump, :stack_dump + # Detect if a particular call is recursing through multiple actors + def detect_recursion + actor = Thread.current[:celluloid_actor] + return unless actor + + task = Thread.current[:celluloid_task] + return unless task + + chain_id = CallChain.current_id + actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id } + end + # Define an exception handler for actor crashes def exception_handler(&block) Logger.exception_handler(&block) end @@ -61,14 +98,22 @@ else waiter.wait end end + def boot + init + start + end + + def init + self.internal_pool = InternalPool.new + end + # Launch default services # FIXME: We should set up the supervision hierarchy here - def boot - internal_pool.reset + def start Celluloid::Notifications::Fanout.supervise_as :notifications_fanout Celluloid::IncidentReporter.supervise_as :default_incident_reporter, STDERR end def register_shutdown @@ -88,39 +133,50 @@ @shutdown_registered = true end # Shut down all running actors def shutdown + actors = Actor.all + Timeout.timeout(shutdown_timeout) do internal_pool.shutdown - actors = Actor.all - Logger.debug "Terminating #{actors.size} actors..." if actors.size > 0 + Logger.debug "Terminating #{actors.size} #{(actors.size > 1) ? 'actors' : 'actor'}..." if actors.size > 0 # Attempt to shut down the supervision tree, if available Supervisor.root.terminate if Supervisor.root # Actors cannot self-terminate, you must do it for them actors.each do |actor| begin actor.terminate! - rescue DeadActorError, MailboxError + rescue DeadActorError end end actors.each do |actor| begin Actor.join(actor) - rescue DeadActorError, MailboxError + rescue DeadActorError end end - - Logger.debug "Shutdown completed cleanly" end rescue Timeout::Error Logger.error("Couldn't cleanly terminate all actors in #{shutdown_timeout} seconds!") + actors.each do |actor| + begin + Actor.kill(actor) + rescue DeadActorError, MailboxDead + end + end + ensure + internal_pool.kill end + + def version + VERSION + end end # Class methods added to classes which include Celluloid module ClassMethods # Create a new actor @@ -171,126 +227,36 @@ # Run an actor in the foreground def run(*args, &block) Actor.join(new(*args, &block)) end - # Trap errors from actors we're linked to when they exit - def exit_handler(callback = nil) - if callback - @exit_handler = callback.to_sym - elsif defined?(@exit_handler) - @exit_handler - elsif superclass.respond_to? :exit_handler - superclass.exit_handler - end - end - alias_method :trap_exit, :exit_handler - - # Define a callback to run when the actor is finalized. - def finalizer(callback = nil) - if callback - @finalizer = callback.to_sym - elsif defined?(@finalizer) - @finalizer - elsif superclass.respond_to? :finalizer - superclass.finalizer - end - end - - # Define the mailbox class for this class - def mailbox_class(klass = nil) - if klass - mailbox.class = klass - else - mailbox.class - end - end - - def proxy_class(klass = nil) - if klass - @proxy_class = klass - elsif defined?(@proxy_class) - @proxy_class - elsif superclass.respond_to? :proxy_class - superclass.proxy_class - else - Celluloid::ActorProxy - end - end - - # Define the default task type for this class - def task_class(klass = nil) - if klass - @task_class = klass - elsif defined?(@task_class) - @task_class - elsif superclass.respond_to? :task_class - superclass.task_class - else - Celluloid.task_class - end - end - # Mark methods as running exclusively def exclusive(*methods) if methods.empty? @exclusive_methods = :all elsif !defined?(@exclusive_methods) || @exclusive_methods != :all @exclusive_methods ||= Set.new @exclusive_methods.merge methods.map(&:to_sym) end end - # Mark methods as running blocks on the receiver - def execute_block_on_receiver(*methods) - receiver_block_executions.merge methods.map(&:to_sym) - end - - def receiver_block_executions - @receiver_block_executions ||= Set.new([:after, :every, :receive]) - end - # Configuration options for Actor#new def actor_options { - :mailbox => mailbox.build, + :mailbox_class => mailbox_class, + :mailbox_size => mailbox_size, :proxy_class => proxy_class, :task_class => task_class, :exit_handler => exit_handler, :exclusive_methods => defined?(@exclusive_methods) ? @exclusive_methods : nil, - :receiver_block_executions => receiver_block_executions + :receiver_block_executions => execute_block_on_receiver } end def ===(other) other.kind_of? self end - - def mailbox - @mailbox_factory ||= MailboxFactory.new(self) - end - - class MailboxFactory - attr_accessor :class, :max_size - - def initialize(actor) - @actor = actor - @class = nil - @max_size = nil - end - - def build - mailbox = mailbox_class.new - mailbox.max_size = @max_size - mailbox - end - - private - def mailbox_class - @class || (@actor.superclass.respond_to?(:mailbox_class) && @actor.superclass.mailbox_class) || Celluloid::Mailbox - end - end end # These are methods we don't want added to the Celluloid singleton but to be # defined on all classes that use Celluloid module InstanceMethods @@ -324,10 +290,12 @@ def name Actor.name end def inspect + return "..." if Celluloid.detect_recursion + str = "#<" if leaked? str << Celluloid::BARE_OBJECT_WARNING_MESSAGE else @@ -361,11 +329,11 @@ raise AbortError.new(cause) end # Terminate this actor def terminate - Thread.current[:celluloid_actor].terminate + Thread.current[:celluloid_actor].proxy.terminate! end # Send a signal with the given name to all waiting methods def signal(name, value = nil) Thread.current[:celluloid_actor].signal name, value @@ -381,11 +349,11 @@ Actor.current end # Obtain the UUID of the current call chain def call_chain_id - Thread.current[:celluloid_chain_id] + CallChain.current_id end # Obtain the running tasks for this actor def tasks Thread.current[:celluloid_actor].tasks.to_a @@ -444,20 +412,34 @@ else Kernel.sleep interval end end + # Timeout on task suspension (eg Sync calls to other actors) + def timeout(duration) + bt = caller + task = Task.current + timer = after(duration) do + exception = Task::TimeoutError.new + exception.set_backtrace bt + task.resume exception + end + yield + ensure + timer.cancel if timer + end + # Run given block in an exclusive mode: all synchronous calls block the whole # actor, not only current message processing. def exclusive(&block) - Thread.current[:celluloid_actor].exclusive(&block) + Thread.current[:celluloid_task].exclusive(&block) end # Are we currently exclusive def exclusive? - actor = Thread.current[:celluloid_actor] - actor && actor.exclusive? + task = Thread.current[:celluloid_task] + task && task.exclusive? end # Call a block after a given interval, returning a Celluloid::Timer object def after(interval, &block) Thread.current[:celluloid_actor].after(interval, &block) @@ -486,13 +468,12 @@ def future(meth = nil, *args, &block) Thread.current[:celluloid_actor].proxy.future meth, *args, &block end end -require 'celluloid/version' - require 'celluloid/calls' +require 'celluloid/call_chain' require 'celluloid/condition' require 'celluloid/thread' require 'celluloid/core_ext' require 'celluloid/cpu_counter' require 'celluloid/fiber' @@ -501,17 +482,19 @@ require 'celluloid/links' require 'celluloid/logger' require 'celluloid/mailbox' require 'celluloid/evented_mailbox' require 'celluloid/method' +require 'celluloid/properties' require 'celluloid/receivers' require 'celluloid/registry' require 'celluloid/responses' require 'celluloid/signals' require 'celluloid/stack_dump' require 'celluloid/system_events' require 'celluloid/tasks' +require 'celluloid/task_set' require 'celluloid/thread_handle' require 'celluloid/uuid' require 'celluloid/proxies/abstract_proxy' require 'celluloid/proxies/sync_proxy' @@ -533,5 +516,6 @@ # Configure default systemwide settings Celluloid.task_class = Celluloid::TaskFiber Celluloid.logger = Logger.new(STDERR) Celluloid.shutdown_timeout = 10 Celluloid.register_shutdown +Celluloid.init