lib/celluloid/actor.rb in celluloid-0.7.0 vs lib/celluloid/actor.rb in celluloid-0.7.1

- old
+ new

@@ -114,11 +114,11 @@ def run while @running begin message = @mailbox.receive(timeout) rescue ExitEvent => exit_event - Task.new(:exit_handler) { handle_exit_event exit_event; nil }.resume + Task.new(:exit_handler) { handle_exit_event exit_event }.resume retry end if message handle_message message @@ -159,24 +159,28 @@ # A hash of tasks to what they're waiting on is more meaningful to the # end-user, and lets us make a copy of the tasks table, rather than # handing them the one we're using internally across threads, a definite # thread safety shared state no-no tasks = {} - current_task = Thread.current[:task] + current_task = Task.current rescue nil tasks[current_task] = :running if current_task - @signals.waiting.each do |waitable, task| - tasks[task] = waitable + @signals.waiting.each do |waitable, waiters| + if waiters.is_a? Enumerable + waiters.each { |waiter| tasks[waiter] = waitable } + else + tasks[waiters] = waitable + end end tasks end # Schedule a block to run at the given time def after(interval) @timers.add(interval) do - Task.new(:timer) { yield; nil }.resume + Task.new(:timer) { yield }.resume end end # Sleep for the given amount of time def sleep(interval) @@ -187,11 +191,11 @@ # Handle an incoming message def handle_message(message) case message when Call - Task.new(:message_handler) { message.dispatch(@subject); nil }.resume + Task.new(:message_handler) { message.dispatch(@subject) }.resume when Response handled_successfully = signal [:call, message.call_id], message unless handled_successfully Logger.debug("anomalous message! spurious response to call #{message.call_id}") @@ -224,9 +228,10 @@ # Handle cleaning up this actor after it exits def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event + tasks.each { |task, _| task.terminate } begin @subject.finalize if @subject.respond_to? :finalize rescue Exception => ex Logger.crash("#{@subject.class}#finalize crashed!", ex)