lib/celluloid/actor.rb in celluloid-0.8.0 vs lib/celluloid/actor.rb in celluloid-0.9.0
- old
+ new
@@ -20,83 +20,102 @@
# messages.
class Actor
extend Registry
attr_reader :proxy, :tasks, :links, :mailbox
- # Invoke a method on the given actor via its mailbox
- def self.call(mailbox, meth, *args, &block)
- call = SyncCall.new(Thread.mailbox, meth, args, block)
+ class << self
+ # Invoke a method on the given actor via its mailbox
+ def call(mailbox, meth, *args, &block)
+ call = SyncCall.new(Thread.mailbox, meth, args, block)
- begin
- mailbox << call
- rescue MailboxError
- raise DeadActorError, "attempted to call a dead actor"
+ begin
+ mailbox << call
+ rescue MailboxError
+ raise DeadActorError, "attempted to call a dead actor"
+ end
+
+ if Celluloid.actor? and not Celluloid.exclusive?
+ # The current task will be automatically resumed when we get a response
+ Task.suspend(:callwait).value
+ else
+ # Otherwise we're inside a normal thread, so block
+ response = Thread.mailbox.receive do |msg|
+ msg.respond_to?(:call) and msg.call == call
+ end
+
+ response.value
+ end
end
- if Celluloid.actor?
- # The current task will be automatically resumed when we get a response
- Task.suspend :callwait
- else
- # Otherwise we're inside a normal thread, so block
- response = Thread.mailbox.receive do |msg|
- msg.respond_to?(:call) and msg.call == call
+ # Invoke a method asynchronously on an actor via its mailbox
+ def async(mailbox, meth, *args, &block)
+ begin
+ mailbox << AsyncCall.new(Thread.mailbox, meth, args, block)
+ rescue MailboxError
+ # Silently swallow asynchronous calls to dead actors. There's no way
+ # to reliably generate DeadActorErrors for async calls, so users of
+ # async calls should find other ways to deal with actors dying
+ # during an async call (i.e. linking/supervisors)
end
+ end
- response.value
+ # Call a method asynchronously and retrieve its value later
+ def future(mailbox, meth, *args, &block)
+ future = Future.new
+ future.execute(mailbox, meth, args, block)
+ future
end
- end
- # Invoke a method asynchronously on an actor via its mailbox
- def self.async(mailbox, meth, *args, &block)
- begin
- mailbox << AsyncCall.new(Thread.mailbox, meth, args, block)
- rescue MailboxError
- # Silently swallow asynchronous calls to dead actors. There's no way
- # to reliably generate DeadActorErrors for async calls, so users of
- # async calls should find other ways to deal with actors dying
- # during an async call (i.e. linking/supervisors)
+ # Obtain all running actors in the system
+ def all
+ actors = []
+ Thread.list.each do |t|
+ actor = t[:actor]
+ actors << actor.proxy if actor
+ end
+ actors
end
end
- # Call a method asynchronously and retrieve its value later
- def self.future(mailbox, meth, *args, &block)
- future = Future.new
- future.execute(mailbox, meth, args, block)
- future
- end
-
# Wrap the given subject with an Actor
def initialize(subject)
- @subject = subject
-
- if subject.respond_to? :mailbox_factory
- @mailbox = subject.mailbox_factory
- else
- @mailbox = Mailbox.new
- end
-
+ @subject = subject
+ @mailbox = subject.class.mailbox_factory
@proxy = ActorProxy.new(@mailbox, subject.class.to_s)
@tasks = Set.new
@links = Links.new
@signals = Signals.new
@receivers = Receivers.new
@timers = Timers.new
@running = true
+ @exclusive = false
@thread = ThreadPool.get do
Thread.current[:actor] = self
Thread.current[:mailbox] = @mailbox
-
run
end
end
# Is this actor alive?
def alive?
@running
end
+ # Is this actor running in exclusive mode?
+ def exclusive?
+ @exclusive
+ end
+
+ # Execute a code block in exclusive mode.
+ def exclusive
+ @exclusive = true
+ yield
+ ensure
+ @exclusive = false
+ end
+
# Terminate this actor
def terminate
@running = false
nil
end
@@ -116,36 +135,34 @@
@receivers.receive(timeout, &block)
end
# Run the actor loop
def run
- while @running
- begin
- message = @mailbox.receive(timeout)
- rescue ExitEvent => exit_event
- Task.new(:exit_handler) { handle_exit_event exit_event }.resume
- retry
- end
+ begin
+ while @running
+ begin
+ message = @mailbox.receive(timeout)
+ rescue ExitEvent => exit_event
+ Task.new(:exit_handler) { handle_exit_event exit_event }.resume
+ retry
+ end
- if message
- handle_message message
- else
- # No message indicates a timeout
- @timers.fire
- @receivers.fire_timers
+ if message
+ handle_message message
+ else
+ # No message indicates a timeout
+ @timers.fire
+ @receivers.fire_timers
+ end
end
+ rescue MailboxShutdown
+ # If the mailbox detects shutdown, exit the actor
end
- cleanup ExitEvent.new(@proxy)
- rescue MailboxShutdown
- # If the mailbox detects shutdown, exit the actor
- @running = false
- rescue Exception => ex
- @running = false
+ shutdown
+ rescue => ex
handle_crash(ex)
- ensure
- ThreadPool.put @thread
end
# How long to wait until the next timer fires
def timeout
i1 = @timers.wait_interval
@@ -167,22 +184,26 @@
end
end
# Sleep for the given amount of time
def sleep(interval)
- task = Task.current
- @timers.add(interval) { task.resume }
- Task.suspend :sleeping
+ if Celluloid.exclusive?
+ Kernel.sleep(interval)
+ else
+ task = Task.current
+ @timers.add(interval) { task.resume }
+ Task.suspend :sleeping
+ end
end
# Handle an incoming message
def handle_message(message)
case message
when Call
Task.new(:message_handler) { message.dispatch(@subject) }.resume
when Response
- message.call.task.resume message.value
+ message.call.task.resume message
else
@receivers.handle_message(message)
end
message
end
@@ -200,24 +221,36 @@
end
# Handle any exceptions that occur within a running actor
def handle_crash(exception)
Logger.crash("#{@subject.class} crashed!", exception)
- cleanup ExitEvent.new(@proxy, exception)
- rescue Exception => ex
+ shutdown ExitEvent.new(@proxy, exception)
+ rescue => ex
Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
end
# Handle cleaning up this actor after it exits
+ def shutdown(exit_event = ExitEvent.new(@proxy))
+ run_finalizer
+ cleanup exit_event
+ ensure
+ Thread.current[:actor] = nil
+ Thread.current[:mailbox] = nil
+ end
+
+ # Run the user-defined finalizer, if one is set
+ def run_finalizer
+ @subject.finalize if @subject.respond_to? :finalize
+ rescue => ex
+ Logger.crash("#{@subject.class}#finalize crashed!", ex)
+ end
+
+ # Clean up after this actor
def cleanup(exit_event)
@mailbox.shutdown
@links.send_event exit_event
tasks.each { |task| task.terminate }
-
- begin
- @subject.finalize if @subject.respond_to? :finalize
- rescue Exception => ex
- Logger.crash("#{@subject.class}#finalize crashed!", ex)
- end
+ rescue => ex
+ Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex)
end
end
end