lib/celluloid.rb in celluloid-0.11.1 vs lib/celluloid.rb in celluloid-0.12.0.pre

- old
+ new

@@ -2,55 +2,28 @@ require 'thread' require 'timeout' require 'set' module Celluloid + extend self # expose all instance methods as singleton methods + SHUTDOWN_TIMEOUT = 120 # How long actors have to terminate - @logger = Logger.new STDERR class << self - attr_accessor :logger # Thread-safe logger class + attr_accessor :logger # Thread-safe logger class + attr_accessor :task_class # Default task type to use def included(klass) - klass.send :extend, ClassMethods + klass.send :extend, ClassMethods + klass.send :include, InstanceMethods end # Are we currently inside of an actor? def actor? !!Thread.current[:actor] end - # Is current actor running in exclusive mode? - def exclusive? - actor? and Thread.current[:actor].exclusive? - end - - # Obtain the currently running actor (if one exists) - def current_actor - Actor.current - end - - # Receive an asynchronous message - def receive(timeout = nil, &block) - actor = Thread.current[:actor] - if actor - actor.receive(timeout, &block) - else - Thread.mailbox.receive(timeout, &block) - end - end - - # Sleep letting the actor continue processing messages - def sleep(interval) - actor = Thread.current[:actor] - if actor - actor.sleep(interval) - else - Kernel.sleep interval - end - end - # Generate a Universally Unique Identifier def uuid UUID.generate end @@ -65,31 +38,29 @@ def exception_handler(&block) Logger.exception_handler(&block) end # Shut down all running actors - # FIXME: This should probably attempt a graceful shutdown of the supervision - # tree before iterating through all actors and telling them to terminate. def shutdown Timeout.timeout(SHUTDOWN_TIMEOUT) do actors = Actor.all Logger.info "Terminating #{actors.size} actors..." if actors.size > 0 # Attempt to shut down the supervision tree, if available Supervisor.root.terminate if Supervisor.root # Actors cannot self-terminate, you must do it for them - terminators = Actor.all.each do |actor| + Actor.all.each do |actor| begin - actor.future(:terminate) + actor.terminate! rescue DeadActorError, MailboxError end end - terminators.each do |terminator| + Actor.all.each do |actor| begin - terminator.value + Actor.join(actor) rescue DeadActorError, MailboxError end end Logger.info "Shutdown completed cleanly" @@ -102,23 +73,22 @@ # Class methods added to classes which include Celluloid module ClassMethods # Create a new actor def new(*args, &block) - proxy = Actor.new(allocate).proxy + proxy = Actor.new(allocate, actor_options).proxy proxy._send_(:initialize, *args, &block) proxy end alias_method :spawn, :new # Create a new actor and link to the current one def new_link(*args, &block) - current_actor = Actor.current - raise NotActorError, "can't link outside actor context" unless current_actor + raise NotActorError, "can't link outside actor context" unless Celluloid.actor? - proxy = Actor.new(allocate).proxy - current_actor.link proxy + proxy = Actor.new(allocate, actor_options).proxy + Actor.link(proxy) proxy._send_(:initialize, *args, &block) proxy end alias_method :spawn_link, :new_link @@ -156,47 +126,107 @@ # Trap errors from actors we're linked to when they exit def trap_exit(callback) @exit_handler = callback.to_sym end - # Obtain the exit handler for this actor - attr_reader :exit_handler - # Configure a custom mailbox factory def use_mailbox(klass = nil, &block) if block @mailbox_factory = block else @mailbox_factory = proc { klass.new } end end + # Define the default task type for this class + def task_class(klass) + @task_class = klass + end + # Mark methods as running exclusively def exclusive(*methods) - @exclusive_methods ||= Set.new - @exclusive_methods.merge methods.map(&:to_sym) + if methods.empty? + @exclusive_methods = :all + elsif @exclusive_methods != :all + @exclusive_methods ||= Set.new + @exclusive_methods.merge methods.map(&:to_sym) + end end - attr_reader :exclusive_methods # Create a mailbox for this actor def mailbox_factory if defined?(@mailbox_factory) @mailbox_factory.call - elsif defined?(super) - super + elsif superclass.respond_to? :mailbox_factory + superclass.mailbox_factory else Mailbox.new end end + # Configuration options for Actor#new + def actor_options + { + :mailbox => mailbox_factory, + :exit_handler => @exit_handler, + :exclusive_methods => @exclusive_methods, + :task_class => @task_class, + } + end + def ===(other) other.kind_of? self end end + # These are methods we don't want added to the Celluloid singleton but to be + # defined on all classes that use Celluloid + module InstanceMethods + # Obtain the Ruby object the actor is wrapping. This should ONLY be used + # for a limited set of use cases like runtime metaprogramming. Interacting + # directly with the wrapped object foregoes any kind of thread safety that + # Celluloid would ordinarily provide you, and the object is guaranteed to + # be shared with at least the actor thread. Tread carefully. + def wrapped_object; self; end + + def inspect + str = "#<Celluloid::Actor(#{self.class}:0x#{object_id.to_s(16)})" + ivars = instance_variables.map do |ivar| + "#{ivar}=#{instance_variable_get(ivar).inspect}" + end + + str << " " << ivars.join(' ') unless ivars.empty? + str << ">" + end + + # Process async calls via method_missing + def method_missing(meth, *args, &block) + # bang methods are async calls + if meth.to_s.match(/!$/) + unbanged_meth = meth.to_s.sub(/!$/, '') + args.unshift unbanged_meth + + call = AsyncCall.new(:__send__, args, block) + begin + Thread.current[:actor].mailbox << call + rescue MailboxError + # Silently swallow asynchronous calls to dead actors. There's no way + # to reliably generate DeadActorErrors for async calls, so users of + # async calls should find other ways to deal with actors dying + # during an async call (i.e. linking/supervisors) + end + + return + end + + super + end + end + # - # Instance methods + # The following methods are available on both the Celluloid singleton and + # directly inside of all classes that include Celluloid # # Is this actor alive? def alive? Thread.current[:actor].alive? @@ -215,20 +245,10 @@ # Terminate this actor def terminate Thread.current[:actor].terminate end - def inspect - str = "#<Celluloid::Actor(#{self.class}:0x#{object_id.to_s(16)})" - ivars = instance_variables.map do |ivar| - "#{ivar}=#{instance_variable_get(ivar).inspect}" - end - - str << " " << ivars.join(' ') unless ivars.empty? - str << ">" - end - # Send a signal with the given name to all waiting methods def signal(name, value = nil) Thread.current[:actor].signal name, value end @@ -250,66 +270,75 @@ # Obtain the running tasks for this actor def tasks Thread.current[:actor].tasks.to_a end - # Obtain the Ruby object the actor is wrapping. This should ONLY be used - # for a limited set of use cases like runtime metaprogramming. Interacting - # directly with the wrapped object foregoes any kind of thread safety that - # Celluloid would ordinarily provide you, and the object is guaranteed to - # be shared with at least the actor thread. Tread carefully. - def wrapped_object; self; end - # Obtain the Celluloid::Links for this actor def links Thread.current[:actor].links end + # Watch for exit events from another actor + def monitor(actor) + Actor.monitor(actor) + end + + # Stop waiting for exit events from another actor + def unmonitor(actor) + Actor.unmonitor(actor) + end + # Link this actor to another, allowing it to crash or react to errors def link(actor) - actor.notify_link Actor.current - notify_link actor + Actor.link(actor) end # Remove links to another actor def unlink(actor) - actor.notify_unlink Actor.current - notify_unlink actor + Actor.unlink(actor) end - def notify_link(actor) - links << actor + # Are we monitoring another actor? + def monitoring?(actor) + Actor.monitoring?(actor) end - def notify_unlink(actor) - links.delete actor - end - # Is this actor linked to another? def linked_to?(actor) - Thread.current[:actor].links.include? actor + Actor.linked_to?(actor) end # Receive an asynchronous message via the actor protocol def receive(timeout = nil, &block) - Celluloid.receive(timeout, &block) + actor = Thread.current[:actor] + if actor + actor.receive(timeout, &block) + else + Thread.mailbox.receive(timeout, &block) + end end - # Sleep while letting the actor continue to receive messages + # Sleep letting the actor continue processing messages def sleep(interval) - Celluloid.sleep(interval) + actor = Thread.current[:actor] + if actor + actor.sleep(interval) + else + Kernel.sleep interval + end end # Run given block in an exclusive mode: all synchronous calls block the whole # actor, not only current message processing. def exclusive(&block) Thread.current[:actor].exclusive(&block) end # Are we currently exclusive def exclusive? - Celluloid.exclusive? + actor = Thread.current[:actor] + actor && actor.exclusive? end # Call a block after a given interval, returning a Celluloid::Timer object def after(interval, &block) Thread.current[:actor].after(interval, &block) @@ -336,56 +365,36 @@ # Handle calls to future within an actor itself def future(meth, *args, &block) Actor.future Thread.current[:actor].mailbox, meth, *args, &block end - - # Process async calls via method_missing - def method_missing(meth, *args, &block) - # bang methods are async calls - if meth.to_s.match(/!$/) - unbanged_meth = meth.to_s.sub(/!$/, '') - args.unshift unbanged_meth - - call = AsyncCall.new(:__send__, args, block) - begin - Thread.current[:actor].mailbox << call - rescue MailboxError - # Silently swallow asynchronous calls to dead actors. There's no way - # to reliably generate DeadActorErrors for async calls, so users of - # async calls should find other ways to deal with actors dying - # during an async call (i.e. linking/supervisors) - end - - return - end - - super - end end require 'celluloid/version' require 'celluloid/actor_proxy' require 'celluloid/calls' require 'celluloid/core_ext' require 'celluloid/cpu_counter' -require 'celluloid/events' require 'celluloid/fiber' require 'celluloid/fsm' require 'celluloid/internal_pool' require 'celluloid/links' require 'celluloid/logger' require 'celluloid/mailbox' +require 'celluloid/method' require 'celluloid/receivers' require 'celluloid/registry' require 'celluloid/responses' require 'celluloid/signals' +require 'celluloid/system_events' require 'celluloid/task' require 'celluloid/thread_handle' require 'celluloid/uuid' require 'celluloid/actor' require 'celluloid/future' require 'celluloid/pool_manager' require 'celluloid/supervision_group' require 'celluloid/supervisor' require 'celluloid/notifications' + +require 'celluloid/boot'