lib/celluloid/actor.rb in celluloid-0.8.0 vs lib/celluloid/actor.rb in celluloid-0.9.0

- old
+ new

@@ -20,83 +20,102 @@ # messages. class Actor extend Registry attr_reader :proxy, :tasks, :links, :mailbox - # Invoke a method on the given actor via its mailbox - def self.call(mailbox, meth, *args, &block) - call = SyncCall.new(Thread.mailbox, meth, args, block) + class << self + # Invoke a method on the given actor via its mailbox + def call(mailbox, meth, *args, &block) + call = SyncCall.new(Thread.mailbox, meth, args, block) - begin - mailbox << call - rescue MailboxError - raise DeadActorError, "attempted to call a dead actor" + begin + mailbox << call + rescue MailboxError + raise DeadActorError, "attempted to call a dead actor" + end + + if Celluloid.actor? and not Celluloid.exclusive? + # The current task will be automatically resumed when we get a response + Task.suspend(:callwait).value + else + # Otherwise we're inside a normal thread, so block + response = Thread.mailbox.receive do |msg| + msg.respond_to?(:call) and msg.call == call + end + + response.value + end end - if Celluloid.actor? - # The current task will be automatically resumed when we get a response - Task.suspend :callwait - else - # Otherwise we're inside a normal thread, so block - response = Thread.mailbox.receive do |msg| - msg.respond_to?(:call) and msg.call == call + # Invoke a method asynchronously on an actor via its mailbox + def async(mailbox, meth, *args, &block) + begin + mailbox << AsyncCall.new(Thread.mailbox, meth, args, block) + rescue MailboxError + # Silently swallow asynchronous calls to dead actors. There's no way + # to reliably generate DeadActorErrors for async calls, so users of + # async calls should find other ways to deal with actors dying + # during an async call (i.e. linking/supervisors) end + end - response.value + # Call a method asynchronously and retrieve its value later + def future(mailbox, meth, *args, &block) + future = Future.new + future.execute(mailbox, meth, args, block) + future end - end - # Invoke a method asynchronously on an actor via its mailbox - def self.async(mailbox, meth, *args, &block) - begin - mailbox << AsyncCall.new(Thread.mailbox, meth, args, block) - rescue MailboxError - # Silently swallow asynchronous calls to dead actors. There's no way - # to reliably generate DeadActorErrors for async calls, so users of - # async calls should find other ways to deal with actors dying - # during an async call (i.e. linking/supervisors) + # Obtain all running actors in the system + def all + actors = [] + Thread.list.each do |t| + actor = t[:actor] + actors << actor.proxy if actor + end + actors end end - # Call a method asynchronously and retrieve its value later - def self.future(mailbox, meth, *args, &block) - future = Future.new - future.execute(mailbox, meth, args, block) - future - end - # Wrap the given subject with an Actor def initialize(subject) - @subject = subject - - if subject.respond_to? :mailbox_factory - @mailbox = subject.mailbox_factory - else - @mailbox = Mailbox.new - end - + @subject = subject + @mailbox = subject.class.mailbox_factory @proxy = ActorProxy.new(@mailbox, subject.class.to_s) @tasks = Set.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new @running = true + @exclusive = false @thread = ThreadPool.get do Thread.current[:actor] = self Thread.current[:mailbox] = @mailbox - run end end # Is this actor alive? def alive? @running end + # Is this actor running in exclusive mode? + def exclusive? + @exclusive + end + + # Execute a code block in exclusive mode. + def exclusive + @exclusive = true + yield + ensure + @exclusive = false + end + # Terminate this actor def terminate @running = false nil end @@ -116,36 +135,34 @@ @receivers.receive(timeout, &block) end # Run the actor loop def run - while @running - begin - message = @mailbox.receive(timeout) - rescue ExitEvent => exit_event - Task.new(:exit_handler) { handle_exit_event exit_event }.resume - retry - end + begin + while @running + begin + message = @mailbox.receive(timeout) + rescue ExitEvent => exit_event + Task.new(:exit_handler) { handle_exit_event exit_event }.resume + retry + end - if message - handle_message message - else - # No message indicates a timeout - @timers.fire - @receivers.fire_timers + if message + handle_message message + else + # No message indicates a timeout + @timers.fire + @receivers.fire_timers + end end + rescue MailboxShutdown + # If the mailbox detects shutdown, exit the actor end - cleanup ExitEvent.new(@proxy) - rescue MailboxShutdown - # If the mailbox detects shutdown, exit the actor - @running = false - rescue Exception => ex - @running = false + shutdown + rescue => ex handle_crash(ex) - ensure - ThreadPool.put @thread end # How long to wait until the next timer fires def timeout i1 = @timers.wait_interval @@ -167,22 +184,26 @@ end end # Sleep for the given amount of time def sleep(interval) - task = Task.current - @timers.add(interval) { task.resume } - Task.suspend :sleeping + if Celluloid.exclusive? + Kernel.sleep(interval) + else + task = Task.current + @timers.add(interval) { task.resume } + Task.suspend :sleeping + end end # Handle an incoming message def handle_message(message) case message when Call Task.new(:message_handler) { message.dispatch(@subject) }.resume when Response - message.call.task.resume message.value + message.call.task.resume message else @receivers.handle_message(message) end message end @@ -200,24 +221,36 @@ end # Handle any exceptions that occur within a running actor def handle_crash(exception) Logger.crash("#{@subject.class} crashed!", exception) - cleanup ExitEvent.new(@proxy, exception) - rescue Exception => ex + shutdown ExitEvent.new(@proxy, exception) + rescue => ex Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex) end # Handle cleaning up this actor after it exits + def shutdown(exit_event = ExitEvent.new(@proxy)) + run_finalizer + cleanup exit_event + ensure + Thread.current[:actor] = nil + Thread.current[:mailbox] = nil + end + + # Run the user-defined finalizer, if one is set + def run_finalizer + @subject.finalize if @subject.respond_to? :finalize + rescue => ex + Logger.crash("#{@subject.class}#finalize crashed!", ex) + end + + # Clean up after this actor def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event tasks.each { |task| task.terminate } - - begin - @subject.finalize if @subject.respond_to? :finalize - rescue Exception => ex - Logger.crash("#{@subject.class}#finalize crashed!", ex) - end + rescue => ex + Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex) end end end