lib/celluloid/actor.rb in celluloid-0.7.2 vs lib/celluloid/actor.rb in celluloid-0.8.0

- old
+ new

@@ -18,15 +18,12 @@ # Actors are Celluloid's concurrency primitive. They're implemented as # normal Ruby objects wrapped in threads which communicate with asynchronous # messages. class Actor extend Registry + attr_reader :proxy, :tasks, :links, :mailbox - attr_reader :proxy - attr_reader :links - attr_reader :mailbox - # Invoke a method on the given actor via its mailbox def self.call(mailbox, meth, *args, &block) call = SyncCall.new(Thread.mailbox, meth, args, block) begin @@ -34,19 +31,20 @@ rescue MailboxError raise DeadActorError, "attempted to call a dead actor" end if Celluloid.actor? - response = Thread.current[:actor].wait [:call, call.id] + # The current task will be automatically resumed when we get a response + Task.suspend :callwait else # Otherwise we're inside a normal thread, so block response = Thread.mailbox.receive do |msg| - msg.respond_to?(:call_id) and msg.call_id == call.id + msg.respond_to?(:call) and msg.call == call end - end - response.value + response.value + end end # Invoke a method asynchronously on an actor via its mailbox def self.async(mailbox, meth, *args, &block) begin @@ -57,28 +55,36 @@ # async calls should find other ways to deal with actors dying # during an async call (i.e. linking/supervisors) end end + # Call a method asynchronously and retrieve its value later + def self.future(mailbox, meth, *args, &block) + future = Future.new + future.execute(mailbox, meth, args, block) + future + end + # Wrap the given subject with an Actor def initialize(subject) @subject = subject if subject.respond_to? :mailbox_factory @mailbox = subject.mailbox_factory else @mailbox = Mailbox.new end + @proxy = ActorProxy.new(@mailbox, subject.class.to_s) + @tasks = Set.new @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new - @proxy = ActorProxy.new(@mailbox, subject.class.to_s) @running = true - @thread = Pool.get do + @thread = ThreadPool.get do Thread.current[:actor] = self Thread.current[:mailbox] = @mailbox run end @@ -135,11 +141,11 @@ @running = false rescue Exception => ex @running = false handle_crash(ex) ensure - Pool.put @thread + ThreadPool.put @thread end # How long to wait until the next timer fires def timeout i1 = @timers.wait_interval @@ -152,31 +158,10 @@ else i2 end end - # Obtain a hash of tasks that are currently waiting - def tasks - # A hash of tasks to what they're waiting on is more meaningful to the - # end-user, and lets us make a copy of the tasks table, rather than - # handing them the one we're using internally across threads, a definite - # thread safety shared state no-no - tasks = {} - current_task = Task.current rescue nil - tasks[current_task] = :running if current_task - - @signals.waiting.each do |waitable, waiters| - if waiters.is_a? Enumerable - waiters.each { |waiter| tasks[waiter] = waitable } - else - tasks[waiters] = waitable - end - end - - tasks - end - # Schedule a block to run at the given time def after(interval) @timers.add(interval) do Task.new(:timer) { yield }.resume end @@ -184,24 +169,20 @@ # Sleep for the given amount of time def sleep(interval) task = Task.current @timers.add(interval) { task.resume } - Task.suspend + Task.suspend :sleeping end # Handle an incoming message def handle_message(message) case message when Call Task.new(:message_handler) { message.dispatch(@subject) }.resume when Response - handled_successfully = signal [:call, message.call_id], message - - unless handled_successfully - Logger.debug("anomalous message! spurious response to call #{message.call_id}") - end + message.call.task.resume message.value else @receivers.handle_message(message) end message end @@ -228,10 +209,10 @@ # Handle cleaning up this actor after it exits def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event - tasks.each { |task, _| task.terminate } + tasks.each { |task| task.terminate } begin @subject.finalize if @subject.respond_to? :finalize rescue Exception => ex Logger.crash("#{@subject.class}#finalize crashed!", ex)