lib/celluloid.rb in celluloid-0.14.1 vs lib/celluloid.rb in celluloid-0.15.0.pre
- old
+ new
@@ -1,11 +1,18 @@
require 'logger'
require 'thread'
require 'timeout'
require 'set'
+if defined?(JRUBY_VERSION) && JRUBY_VERSION == "1.7.3"
+ raise "Celluloid is broken on JRuby 1.7.3. Please upgrade to 1.7.4+"
+end
+
module Celluloid
+ VERSION = '0.15.0.pre'
+ Error = Class.new StandardError
+
extend self # expose all instance methods as singleton methods
# Warning message added to Celluloid objects accessed outside their actors
BARE_OBJECT_WARNING_MESSAGE = "WARNING: BARE CELLULOID OBJECT "
@@ -16,10 +23,28 @@
attr_accessor :shutdown_timeout # How long actors have to terminate
def included(klass)
klass.send :extend, ClassMethods
klass.send :include, InstanceMethods
+
+ klass.send :extend, Properties
+
+ klass.property :mailbox_class, :default => Celluloid::Mailbox
+ klass.property :proxy_class, :default => Celluloid::ActorProxy
+ klass.property :task_class, :default => Celluloid.task_class
+ klass.property :mailbox_size
+
+ klass.property :execute_block_on_receiver,
+ :default => [:after, :every, :receive],
+ :multi => true
+
+ klass.property :finalizer
+ klass.property :exit_handler
+
+ klass.send(:define_singleton_method, :trap_exit) do |*args|
+ exit_handler(*args)
+ end
end
# Are we currently inside of an actor?
def actor?
!!Thread.current[:celluloid_actor]
@@ -46,10 +71,22 @@
def stack_dump(output = STDERR)
Celluloid::StackDump.new.dump(output)
end
alias_method :dump, :stack_dump
+ # Detect if a particular call is recursing through multiple actors
+ def detect_recursion
+ actor = Thread.current[:celluloid_actor]
+ return unless actor
+
+ task = Thread.current[:celluloid_task]
+ return unless task
+
+ chain_id = CallChain.current_id
+ actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id }
+ end
+
# Define an exception handler for actor crashes
def exception_handler(&block)
Logger.exception_handler(&block)
end
@@ -61,14 +98,22 @@
else
waiter.wait
end
end
+ def boot
+ init
+ start
+ end
+
+ def init
+ self.internal_pool = InternalPool.new
+ end
+
# Launch default services
# FIXME: We should set up the supervision hierarchy here
- def boot
- internal_pool.reset
+ def start
Celluloid::Notifications::Fanout.supervise_as :notifications_fanout
Celluloid::IncidentReporter.supervise_as :default_incident_reporter, STDERR
end
def register_shutdown
@@ -88,39 +133,50 @@
@shutdown_registered = true
end
# Shut down all running actors
def shutdown
+ actors = Actor.all
+
Timeout.timeout(shutdown_timeout) do
internal_pool.shutdown
- actors = Actor.all
- Logger.debug "Terminating #{actors.size} actors..." if actors.size > 0
+ Logger.debug "Terminating #{actors.size} #{(actors.size > 1) ? 'actors' : 'actor'}..." if actors.size > 0
# Attempt to shut down the supervision tree, if available
Supervisor.root.terminate if Supervisor.root
# Actors cannot self-terminate, you must do it for them
actors.each do |actor|
begin
actor.terminate!
- rescue DeadActorError, MailboxError
+ rescue DeadActorError
end
end
actors.each do |actor|
begin
Actor.join(actor)
- rescue DeadActorError, MailboxError
+ rescue DeadActorError
end
end
-
- Logger.debug "Shutdown completed cleanly"
end
rescue Timeout::Error
Logger.error("Couldn't cleanly terminate all actors in #{shutdown_timeout} seconds!")
+ actors.each do |actor|
+ begin
+ Actor.kill(actor)
+ rescue DeadActorError, MailboxDead
+ end
+ end
+ ensure
+ internal_pool.kill
end
+
+ def version
+ VERSION
+ end
end
# Class methods added to classes which include Celluloid
module ClassMethods
# Create a new actor
@@ -171,126 +227,36 @@
# Run an actor in the foreground
def run(*args, &block)
Actor.join(new(*args, &block))
end
- # Trap errors from actors we're linked to when they exit
- def exit_handler(callback = nil)
- if callback
- @exit_handler = callback.to_sym
- elsif defined?(@exit_handler)
- @exit_handler
- elsif superclass.respond_to? :exit_handler
- superclass.exit_handler
- end
- end
- alias_method :trap_exit, :exit_handler
-
- # Define a callback to run when the actor is finalized.
- def finalizer(callback = nil)
- if callback
- @finalizer = callback.to_sym
- elsif defined?(@finalizer)
- @finalizer
- elsif superclass.respond_to? :finalizer
- superclass.finalizer
- end
- end
-
- # Define the mailbox class for this class
- def mailbox_class(klass = nil)
- if klass
- mailbox.class = klass
- else
- mailbox.class
- end
- end
-
- def proxy_class(klass = nil)
- if klass
- @proxy_class = klass
- elsif defined?(@proxy_class)
- @proxy_class
- elsif superclass.respond_to? :proxy_class
- superclass.proxy_class
- else
- Celluloid::ActorProxy
- end
- end
-
- # Define the default task type for this class
- def task_class(klass = nil)
- if klass
- @task_class = klass
- elsif defined?(@task_class)
- @task_class
- elsif superclass.respond_to? :task_class
- superclass.task_class
- else
- Celluloid.task_class
- end
- end
-
# Mark methods as running exclusively
def exclusive(*methods)
if methods.empty?
@exclusive_methods = :all
elsif !defined?(@exclusive_methods) || @exclusive_methods != :all
@exclusive_methods ||= Set.new
@exclusive_methods.merge methods.map(&:to_sym)
end
end
- # Mark methods as running blocks on the receiver
- def execute_block_on_receiver(*methods)
- receiver_block_executions.merge methods.map(&:to_sym)
- end
-
- def receiver_block_executions
- @receiver_block_executions ||= Set.new([:after, :every, :receive])
- end
-
# Configuration options for Actor#new
def actor_options
{
- :mailbox => mailbox.build,
+ :mailbox_class => mailbox_class,
+ :mailbox_size => mailbox_size,
:proxy_class => proxy_class,
:task_class => task_class,
:exit_handler => exit_handler,
:exclusive_methods => defined?(@exclusive_methods) ? @exclusive_methods : nil,
- :receiver_block_executions => receiver_block_executions
+ :receiver_block_executions => execute_block_on_receiver
}
end
def ===(other)
other.kind_of? self
end
-
- def mailbox
- @mailbox_factory ||= MailboxFactory.new(self)
- end
-
- class MailboxFactory
- attr_accessor :class, :max_size
-
- def initialize(actor)
- @actor = actor
- @class = nil
- @max_size = nil
- end
-
- def build
- mailbox = mailbox_class.new
- mailbox.max_size = @max_size
- mailbox
- end
-
- private
- def mailbox_class
- @class || (@actor.superclass.respond_to?(:mailbox_class) && @actor.superclass.mailbox_class) || Celluloid::Mailbox
- end
- end
end
# These are methods we don't want added to the Celluloid singleton but to be
# defined on all classes that use Celluloid
module InstanceMethods
@@ -324,10 +290,12 @@
def name
Actor.name
end
def inspect
+ return "..." if Celluloid.detect_recursion
+
str = "#<"
if leaked?
str << Celluloid::BARE_OBJECT_WARNING_MESSAGE
else
@@ -361,11 +329,11 @@
raise AbortError.new(cause)
end
# Terminate this actor
def terminate
- Thread.current[:celluloid_actor].terminate
+ Thread.current[:celluloid_actor].proxy.terminate!
end
# Send a signal with the given name to all waiting methods
def signal(name, value = nil)
Thread.current[:celluloid_actor].signal name, value
@@ -381,11 +349,11 @@
Actor.current
end
# Obtain the UUID of the current call chain
def call_chain_id
- Thread.current[:celluloid_chain_id]
+ CallChain.current_id
end
# Obtain the running tasks for this actor
def tasks
Thread.current[:celluloid_actor].tasks.to_a
@@ -444,20 +412,34 @@
else
Kernel.sleep interval
end
end
+ # Timeout on task suspension (eg Sync calls to other actors)
+ def timeout(duration)
+ bt = caller
+ task = Task.current
+ timer = after(duration) do
+ exception = Task::TimeoutError.new
+ exception.set_backtrace bt
+ task.resume exception
+ end
+ yield
+ ensure
+ timer.cancel if timer
+ end
+
# Run given block in an exclusive mode: all synchronous calls block the whole
# actor, not only current message processing.
def exclusive(&block)
- Thread.current[:celluloid_actor].exclusive(&block)
+ Thread.current[:celluloid_task].exclusive(&block)
end
# Are we currently exclusive
def exclusive?
- actor = Thread.current[:celluloid_actor]
- actor && actor.exclusive?
+ task = Thread.current[:celluloid_task]
+ task && task.exclusive?
end
# Call a block after a given interval, returning a Celluloid::Timer object
def after(interval, &block)
Thread.current[:celluloid_actor].after(interval, &block)
@@ -486,13 +468,12 @@
def future(meth = nil, *args, &block)
Thread.current[:celluloid_actor].proxy.future meth, *args, &block
end
end
-require 'celluloid/version'
-
require 'celluloid/calls'
+require 'celluloid/call_chain'
require 'celluloid/condition'
require 'celluloid/thread'
require 'celluloid/core_ext'
require 'celluloid/cpu_counter'
require 'celluloid/fiber'
@@ -501,17 +482,19 @@
require 'celluloid/links'
require 'celluloid/logger'
require 'celluloid/mailbox'
require 'celluloid/evented_mailbox'
require 'celluloid/method'
+require 'celluloid/properties'
require 'celluloid/receivers'
require 'celluloid/registry'
require 'celluloid/responses'
require 'celluloid/signals'
require 'celluloid/stack_dump'
require 'celluloid/system_events'
require 'celluloid/tasks'
+require 'celluloid/task_set'
require 'celluloid/thread_handle'
require 'celluloid/uuid'
require 'celluloid/proxies/abstract_proxy'
require 'celluloid/proxies/sync_proxy'
@@ -533,5 +516,6 @@
# Configure default systemwide settings
Celluloid.task_class = Celluloid::TaskFiber
Celluloid.logger = Logger.new(STDERR)
Celluloid.shutdown_timeout = 10
Celluloid.register_shutdown
+Celluloid.init