lib/celluloid/actor.rb in celluloid-0.11.0 vs lib/celluloid/actor.rb in celluloid-0.11.1
- old
+ new
@@ -1,5 +1,7 @@
+require 'timers'
+
module Celluloid
# Don't do Actor-like things outside Actor scope
class NotActorError < StandardError; end
# Trying to do something to a dead actor
@@ -17,14 +19,25 @@
# Actors are Celluloid's concurrency primitive. They're implemented as
# normal Ruby objects wrapped in threads which communicate with asynchronous
# messages.
class Actor
- extend Registry
attr_reader :subject, :proxy, :tasks, :links, :mailbox, :thread, :name
class << self
+ extend Forwardable
+
+ def_delegators "Celluloid::Registry.root", :[], :[]=
+
+ def registered
+ Registry.root.names
+ end
+
+ def clear_registry
+ Registry.root.clear
+ end
+
# Obtain the current actor
def current
actor = Thread.current[:actor]
raise NotActorError, "not in actor scope" unless actor
actor.proxy
@@ -61,11 +74,11 @@
end
# Invoke a method asynchronously on an actor via its mailbox
def async(mailbox, meth, *args, &block)
begin
- mailbox << AsyncCall.new(Thread.mailbox, meth, args, block)
+ mailbox << AsyncCall.new(meth, args, block)
rescue MailboxError
# Silently swallow asynchronous calls to dead actors. There's no way
# to reliably generate DeadActorErrors for async calls, so users of
# async calls should find other ways to deal with actors dying
# during an async call (i.e. linking/supervisors)
@@ -90,12 +103,15 @@
end
end
# Wrap the given subject with an Actor
def initialize(subject)
- @subject = subject
- @mailbox = subject.class.mailbox_factory
+ @subject = subject
+ @mailbox = subject.class.mailbox_factory
+ @exit_handler = subject.class.exit_handler
+ @exclusives = subject.class.exclusive_methods
+
@tasks = Set.new
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
@timers = Timers.new
@@ -140,37 +156,33 @@
@signals.wait name
end
# Receive an asynchronous message
def receive(timeout = nil, &block)
- @receivers.receive(timeout, &block)
+ begin
+ @receivers.receive(timeout, &block)
+ rescue SystemEvent => event
+ handle_system_event(event)
+ retry
+ end
end
# Run the actor loop
def run
begin
while @running
- begin
- message = @mailbox.receive(timeout)
- rescue ExitEvent => exit_event
- Task.new(:exit_handler) { handle_exit_event exit_event }.resume
- retry
- rescue NamingRequest => ex
- @name = ex.name
- retry
- rescue TerminationRequest
- break
- end
-
- if message
+ if message = @mailbox.receive(timeout)
handle_message message
else
# No message indicates a timeout
@timers.fire
@receivers.fire_timers
end
end
+ rescue SystemEvent => event
+ handle_system_event event
+ retry
rescue MailboxShutdown
# If the mailbox detects shutdown, exit the actor
end
shutdown
@@ -193,55 +205,69 @@
end
end
# Schedule a block to run at the given time
def after(interval)
- @timers.add(interval) do
+ @timers.after(interval) do
Task.new(:timer) { yield }.resume
end
end
# Schedule a block to run at the given time
def every(interval)
- @timers.add(interval, true) do
+ @timers.every(interval) do
Task.new(:timer) { yield }.resume
end
end
# Sleep for the given amount of time
def sleep(interval)
if Celluloid.exclusive?
Kernel.sleep(interval)
else
task = Task.current
- @timers.add(interval) { task.resume }
+ @timers.after(interval) { task.resume }
Task.suspend :sleeping
end
end
- # Handle an incoming message
+ # Handle standard low-priority messages
def handle_message(message)
case message
when Call
- Task.new(:message_handler) { message.dispatch(@subject) }.resume
+ if @exclusives && @exclusives.include?(message.method)
+ exclusive { message.dispatch(@subject) }
+ else
+ Task.new(:message_handler) { message.dispatch(@subject) }.resume
+ end
when Response
- message.call.task.resume message
+ message.dispatch
else
@receivers.handle_message(message)
end
message
end
- # Handle exit events received by this actor
- def handle_exit_event(exit_event)
- exit_handler = @subject.class.exit_handler
- if exit_handler
- return @subject.send(exit_handler, exit_event.actor, exit_event.reason)
+ # Handle high-priority system event messages
+ def handle_system_event(event)
+ case event
+ when ExitEvent
+ Task.new(:exit_handler) { handle_exit_event event }.resume
+ when NamingRequest
+ @name = event.name
+ when TerminationRequest
+ @running = false
end
+ end
+ # Handle exit events received by this actor
+ def handle_exit_event(event)
+ # Run the exit handler if available
+ return @subject.send(@exit_handler, event.actor, event.reason) if @exit_handler
+
# Reraise exceptions from linked actors
# If no reason is given, actor terminated cleanly
- raise exit_event.reason if exit_event.reason
+ raise event.reason if event.reason
end
# Handle any exceptions that occur within a running actor
def handle_crash(exception)
Logger.crash("#{@subject.class} crashed!", exception)