lib/celluloid/actor.rb in celluloid-0.7.0 vs lib/celluloid/actor.rb in celluloid-0.7.1
- old
+ new
@@ -114,11 +114,11 @@
def run
while @running
begin
message = @mailbox.receive(timeout)
rescue ExitEvent => exit_event
- Task.new(:exit_handler) { handle_exit_event exit_event; nil }.resume
+ Task.new(:exit_handler) { handle_exit_event exit_event }.resume
retry
end
if message
handle_message message
@@ -159,24 +159,28 @@
# A hash of tasks to what they're waiting on is more meaningful to the
# end-user, and lets us make a copy of the tasks table, rather than
# handing them the one we're using internally across threads, a definite
# thread safety shared state no-no
tasks = {}
- current_task = Thread.current[:task]
+ current_task = Task.current rescue nil
tasks[current_task] = :running if current_task
- @signals.waiting.each do |waitable, task|
- tasks[task] = waitable
+ @signals.waiting.each do |waitable, waiters|
+ if waiters.is_a? Enumerable
+ waiters.each { |waiter| tasks[waiter] = waitable }
+ else
+ tasks[waiters] = waitable
+ end
end
tasks
end
# Schedule a block to run at the given time
def after(interval)
@timers.add(interval) do
- Task.new(:timer) { yield; nil }.resume
+ Task.new(:timer) { yield }.resume
end
end
# Sleep for the given amount of time
def sleep(interval)
@@ -187,11 +191,11 @@
# Handle an incoming message
def handle_message(message)
case message
when Call
- Task.new(:message_handler) { message.dispatch(@subject); nil }.resume
+ Task.new(:message_handler) { message.dispatch(@subject) }.resume
when Response
handled_successfully = signal [:call, message.call_id], message
unless handled_successfully
Logger.debug("anomalous message! spurious response to call #{message.call_id}")
@@ -224,9 +228,10 @@
# Handle cleaning up this actor after it exits
def cleanup(exit_event)
@mailbox.shutdown
@links.send_event exit_event
+ tasks.each { |task, _| task.terminate }
begin
@subject.finalize if @subject.respond_to? :finalize
rescue Exception => ex
Logger.crash("#{@subject.class}#finalize crashed!", ex)