lib/celluloid.rb in celluloid-0.13.0 vs lib/celluloid.rb in celluloid-0.14.0.pre
- old
+ new
@@ -23,10 +23,15 @@
# Are we currently inside of an actor?
def actor?
!!Thread.current[:celluloid_actor]
end
+ # Retrieve the mailbox for the current thread or lazily initialize it
+ def mailbox
+ Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new
+ end
+
# Generate a Universally Unique Identifier
def uuid
UUID.generate
end
@@ -59,41 +64,61 @@
end
# Launch default services
# FIXME: We should set up the supervision hierarchy here
def boot
+ internal_pool.reset
Celluloid::Notifications::Fanout.supervise_as :notifications_fanout
Celluloid::IncidentReporter.supervise_as :default_incident_reporter, STDERR
end
+ def register_shutdown
+ return if @shutdown_registered
+ # Terminate all actors at exit
+ at_exit do
+ if defined?(RUBY_ENGINE) && RUBY_ENGINE == "ruby" && RUBY_VERSION >= "1.9"
+ # workaround for MRI bug losing exit status in at_exit block
+ # http://bugs.ruby-lang.org/issues/5218
+ exit_status = $!.status if $!.is_a?(SystemExit)
+ Celluloid.shutdown
+ exit exit_status if exit_status
+ else
+ Celluloid.shutdown
+ end
+ end
+ @shutdown_registered = true
+ end
+
# Shut down all running actors
def shutdown
Timeout.timeout(shutdown_timeout) do
+ internal_pool.shutdown
+
actors = Actor.all
Logger.debug "Terminating #{actors.size} actors..." if actors.size > 0
# Attempt to shut down the supervision tree, if available
Supervisor.root.terminate if Supervisor.root
# Actors cannot self-terminate, you must do it for them
- Actor.all.each do |actor|
+ actors.each do |actor|
begin
actor.terminate!
rescue DeadActorError, MailboxError
end
end
- Actor.all.each do |actor|
+ actors.each do |actor|
begin
Actor.join(actor)
rescue DeadActorError, MailboxError
end
end
Logger.debug "Shutdown completed cleanly"
end
- rescue Timeout::Error => ex
+ rescue Timeout::Error
Logger.error("Couldn't cleanly terminate all actors in #{shutdown_timeout} seconds!")
end
end
# Class methods added to classes which include Celluloid
@@ -172,17 +197,13 @@
end
# Define the mailbox class for this class
def mailbox_class(klass = nil)
if klass
- @mailbox_class = klass
- elsif defined?(@mailbox_class)
- @mailbox_class
- elsif superclass.respond_to? :mailbox_class
- superclass.mailbox_class
+ mailbox.class = klass
else
- Celluloid::Mailbox
+ mailbox.class
end
end
def proxy_class(klass = nil)
if klass
@@ -211,36 +232,65 @@
# Mark methods as running exclusively
def exclusive(*methods)
if methods.empty?
@exclusive_methods = :all
- elsif @exclusive_methods != :all
+ elsif !defined?(@exclusive_methods) || @exclusive_methods != :all
@exclusive_methods ||= Set.new
@exclusive_methods.merge methods.map(&:to_sym)
end
end
# Mark methods as running blocks on the receiver
def execute_block_on_receiver(*methods)
- # A noop method in preparation
- # See https://github.com/celluloid/celluloid/pull/55
+ receiver_block_executions.merge methods.map(&:to_sym)
end
+ def receiver_block_executions
+ @receiver_block_executions ||= Set.new([:after, :every, :receive])
+ end
+
# Configuration options for Actor#new
def actor_options
{
- :mailbox => mailbox_class.new,
+ :mailbox => mailbox.build,
:proxy_class => proxy_class,
:task_class => task_class,
:exit_handler => exit_handler,
- :exclusive_methods => defined?(@exclusive_methods) ? @exclusive_methods : nil
+ :exclusive_methods => defined?(@exclusive_methods) ? @exclusive_methods : nil,
+ :receiver_block_executions => receiver_block_executions
}
end
def ===(other)
other.kind_of? self
end
+
+ def mailbox
+ @mailbox_factory ||= MailboxFactory.new(self)
+ end
+
+ class MailboxFactory
+ attr_accessor :class, :max_size
+
+ def initialize(actor)
+ @actor = actor
+ @class = nil
+ @max_size = nil
+ end
+
+ def build
+ mailbox = mailbox_class.new
+ mailbox.max_size = @max_size
+ mailbox
+ end
+
+ private
+ def mailbox_class
+ @class || (@actor.superclass.respond_to?(:mailbox_class) && @actor.superclass.mailbox_class) || Celluloid::Mailbox
+ end
+ end
end
# These are methods we don't want added to the Celluloid singleton but to be
# defined on all classes that use Celluloid
module InstanceMethods
@@ -263,10 +313,20 @@
# Are we being invoked in a different thread from our owner?
def leaked?
@celluloid_owner != Thread.current[:celluloid_actor]
end
+ def tap
+ yield current_actor
+ current_actor
+ end
+
+ # Obtain the name of the current actor
+ def name
+ Actor.name
+ end
+
def inspect
str = "#<"
if leaked?
str << Celluloid::BARE_OBJECT_WARNING_MESSAGE
@@ -289,11 +349,11 @@
#
# The following methods are available on both the Celluloid singleton and
# directly inside of all classes that include Celluloid
#
- # Raise an exception in caller context, but stay running
+ # Raise an exception in sender context, but stay running
def abort(cause)
cause = case cause
when String then RuntimeError.new(cause)
when Exception then cause
else raise TypeError, "Exception object/String expected, but #{cause.class} received"
@@ -324,15 +384,10 @@
# Obtain the UUID of the current call chain
def call_chain_id
Thread.current[:celluloid_chain_id]
end
- # Obtain the name of the current actor
- def name
- Actor.name
- end
-
# Obtain the running tasks for this actor
def tasks
Thread.current[:celluloid_actor].tasks.to_a
end
@@ -375,11 +430,11 @@
def receive(timeout = nil, &block)
actor = Thread.current[:celluloid_actor]
if actor
actor.receive(timeout, &block)
else
- Thread.mailbox.receive(timeout, &block)
+ Celluloid.mailbox.receive(timeout, &block)
end
end
# Sleep letting the actor continue processing messages
def sleep(interval)
@@ -412,49 +467,43 @@
def every(interval, &block)
Thread.current[:celluloid_actor].every(interval, &block)
end
# Perform a blocking or computationally intensive action inside an
- # asynchronous thread pool, allowing the caller to continue processing other
+ # asynchronous thread pool, allowing the sender to continue processing other
# messages in its mailbox in the meantime
def defer(&block)
# This implementation relies on the present implementation of
# Celluloid::Future, which uses a thread from InternalPool to run the block
Future.new(&block).value
end
# Handle async calls within an actor itself
def async(meth = nil, *args, &block)
- if meth
- Actor.async Thread.current[:celluloid_actor].mailbox, meth, *args, &block
- else
- Thread.current[:celluloid_actor].proxy.async
- end
+ Thread.current[:celluloid_actor].proxy.async meth, *args, &block
end
# Handle calls to future within an actor itself
def future(meth = nil, *args, &block)
- if meth
- Actor.future Thread.current[:celluloid_actor].mailbox, meth, *args, &block
- else
- Thread.current[:celluloid_actor].proxy.future
- end
+ Thread.current[:celluloid_actor].proxy.future meth, *args, &block
end
end
require 'celluloid/version'
require 'celluloid/calls'
require 'celluloid/condition'
+require 'celluloid/thread'
require 'celluloid/core_ext'
require 'celluloid/cpu_counter'
require 'celluloid/fiber'
require 'celluloid/fsm'
require 'celluloid/internal_pool'
require 'celluloid/links'
require 'celluloid/logger'
require 'celluloid/mailbox'
+require 'celluloid/evented_mailbox'
require 'celluloid/method'
require 'celluloid/receivers'
require 'celluloid/registry'
require 'celluloid/responses'
require 'celluloid/signals'
@@ -463,13 +512,15 @@
require 'celluloid/tasks'
require 'celluloid/thread_handle'
require 'celluloid/uuid'
require 'celluloid/proxies/abstract_proxy'
+require 'celluloid/proxies/sync_proxy'
require 'celluloid/proxies/actor_proxy'
require 'celluloid/proxies/async_proxy'
require 'celluloid/proxies/future_proxy'
+require 'celluloid/proxies/block_proxy'
require 'celluloid/actor'
require 'celluloid/future'
require 'celluloid/pool_manager'
require 'celluloid/supervision_group'
@@ -481,5 +532,6 @@
# Configure default systemwide settings
Celluloid.task_class = Celluloid::TaskFiber
Celluloid.logger = Logger.new(STDERR)
Celluloid.shutdown_timeout = 10
+Celluloid.register_shutdown