lib/celluloid/actor.rb in celluloid-0.11.0 vs lib/celluloid/actor.rb in celluloid-0.11.1

- old
+ new

@@ -1,5 +1,7 @@ +require 'timers' + module Celluloid # Don't do Actor-like things outside Actor scope class NotActorError < StandardError; end # Trying to do something to a dead actor @@ -17,14 +19,25 @@ # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous # messages. class Actor - extend Registry attr_reader :subject, :proxy, :tasks, :links, :mailbox, :thread, :name class << self + extend Forwardable + + def_delegators "Celluloid::Registry.root", :[], :[]= + + def registered + Registry.root.names + end + + def clear_registry + Registry.root.clear + end + # Obtain the current actor def current actor = Thread.current[:actor] raise NotActorError, "not in actor scope" unless actor actor.proxy @@ -61,11 +74,11 @@ end # Invoke a method asynchronously on an actor via its mailbox def async(mailbox, meth, *args, &block) begin - mailbox << AsyncCall.new(Thread.mailbox, meth, args, block) + mailbox << AsyncCall.new(meth, args, block) rescue MailboxError # Silently swallow asynchronous calls to dead actors. There's no way # to reliably generate DeadActorErrors for async calls, so users of # async calls should find other ways to deal with actors dying # during an async call (i.e. linking/supervisors) @@ -90,12 +103,15 @@ end end # Wrap the given subject with an Actor def initialize(subject) - @subject = subject - @mailbox = subject.class.mailbox_factory + @subject = subject + @mailbox = subject.class.mailbox_factory + @exit_handler = subject.class.exit_handler + @exclusives = subject.class.exclusive_methods + @tasks = Set.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new @@ -140,37 +156,33 @@ @signals.wait name end # Receive an asynchronous message def receive(timeout = nil, &block) - @receivers.receive(timeout, &block) + begin + @receivers.receive(timeout, &block) + rescue SystemEvent => event + handle_system_event(event) + retry + end end # Run the actor loop def run begin while @running - begin - message = @mailbox.receive(timeout) - rescue ExitEvent => exit_event - Task.new(:exit_handler) { handle_exit_event exit_event }.resume - retry - rescue NamingRequest => ex - @name = ex.name - retry - rescue TerminationRequest - break - end - - if message + if message = @mailbox.receive(timeout) handle_message message else # No message indicates a timeout @timers.fire @receivers.fire_timers end end + rescue SystemEvent => event + handle_system_event event + retry rescue MailboxShutdown # If the mailbox detects shutdown, exit the actor end shutdown @@ -193,55 +205,69 @@ end end # Schedule a block to run at the given time def after(interval) - @timers.add(interval) do + @timers.after(interval) do Task.new(:timer) { yield }.resume end end # Schedule a block to run at the given time def every(interval) - @timers.add(interval, true) do + @timers.every(interval) do Task.new(:timer) { yield }.resume end end # Sleep for the given amount of time def sleep(interval) if Celluloid.exclusive? Kernel.sleep(interval) else task = Task.current - @timers.add(interval) { task.resume } + @timers.after(interval) { task.resume } Task.suspend :sleeping end end - # Handle an incoming message + # Handle standard low-priority messages def handle_message(message) case message when Call - Task.new(:message_handler) { message.dispatch(@subject) }.resume + if @exclusives && @exclusives.include?(message.method) + exclusive { message.dispatch(@subject) } + else + Task.new(:message_handler) { message.dispatch(@subject) }.resume + end when Response - message.call.task.resume message + message.dispatch else @receivers.handle_message(message) end message end - # Handle exit events received by this actor - def handle_exit_event(exit_event) - exit_handler = @subject.class.exit_handler - if exit_handler - return @subject.send(exit_handler, exit_event.actor, exit_event.reason) + # Handle high-priority system event messages + def handle_system_event(event) + case event + when ExitEvent + Task.new(:exit_handler) { handle_exit_event event }.resume + when NamingRequest + @name = event.name + when TerminationRequest + @running = false end + end + # Handle exit events received by this actor + def handle_exit_event(event) + # Run the exit handler if available + return @subject.send(@exit_handler, event.actor, event.reason) if @exit_handler + # Reraise exceptions from linked actors # If no reason is given, actor terminated cleanly - raise exit_event.reason if exit_event.reason + raise event.reason if event.reason end # Handle any exceptions that occur within a running actor def handle_crash(exception) Logger.crash("#{@subject.class} crashed!", exception)