lib/celluloid/actor.rb in celluloid-0.6.2 vs lib/celluloid/actor.rb in celluloid-0.7.0

- old
+ new

@@ -34,12 +34,11 @@ rescue MailboxError raise DeadActorError, "attempted to call a dead actor" end if Celluloid.actor? - # Yield to the actor scheduler, which resumes us when we get a response - response = Fiber.yield(call) + response = Thread.current[:actor].wait [:call, call.id] else # Otherwise we're inside a normal thread, so block response = Thread.mailbox.receive do |msg| msg.respond_to?(:call_id) and msg.call_id == call.id end @@ -65,19 +64,19 @@ @subject = subject if subject.respond_to? :mailbox_factory @mailbox = subject.mailbox_factory else - @mailbox = Celluloid::Mailbox.new + @mailbox = Mailbox.new end @links = Links.new @signals = Signals.new @receivers = Receivers.new + @timers = Timers.new @proxy = ActorProxy.new(@mailbox, self.class.to_s) @running = true - @pending_calls = {} @thread = Pool.get do Thread.current[:actor] = self Thread.current[:mailbox] = @mailbox @@ -105,25 +104,31 @@ def wait(name) @signals.wait name end # Receive an asynchronous message - def receive(&block) - @receivers.receive(&block) + def receive(timeout = nil, &block) + @receivers.receive(timeout, &block) end # Run the actor loop def run while @running begin - message = @mailbox.receive + message = @mailbox.receive(timeout) rescue ExitEvent => exit_event - Celluloid::Fiber.new { handle_exit_event exit_event; nil }.resume + Task.new(:exit_handler) { handle_exit_event exit_event; nil }.resume retry end - handle_message message + if message + handle_message message + else + # No message indicates a timeout + @timers.fire + @receivers.fire_timers + end end cleanup ExitEvent.new(@proxy) rescue MailboxShutdown # If the mailbox detects shutdown, exit the actor @@ -133,28 +138,65 @@ handle_crash(ex) ensure Pool.put @thread end - # Register a fiber waiting for the response to a Celluloid::Call - def register_fiber(call, fiber) - raise ArgumentError, "attempted to register a dead fiber" unless fiber.alive? - @pending_calls[call.id] = fiber + # How long to wait until the next timer fires + def timeout + i1 = @timers.wait_interval + i2 = @receivers.wait_interval + + if i1 and i2 + i1 < i2 ? i1 : i2 + elsif i1 + i1 + else + i2 + end end + # Obtain a hash of tasks that are currently waiting + def tasks + # A hash of tasks to what they're waiting on is more meaningful to the + # end-user, and lets us make a copy of the tasks table, rather than + # handing them the one we're using internally across threads, a definite + # thread safety shared state no-no + tasks = {} + current_task = Thread.current[:task] + tasks[current_task] = :running if current_task + + @signals.waiting.each do |waitable, task| + tasks[task] = waitable + end + + tasks + end + + # Schedule a block to run at the given time + def after(interval) + @timers.add(interval) do + Task.new(:timer) { yield; nil }.resume + end + end + + # Sleep for the given amount of time + def sleep(interval) + task = Task.current + @timers.add(interval) { task.resume } + Task.suspend + end + # Handle an incoming message def handle_message(message) case message when Call - Celluloid::Fiber.new { message.dispatch(@subject); nil }.resume + Task.new(:message_handler) { message.dispatch(@subject); nil }.resume when Response - fiber = @pending_calls.delete(message.call_id) + handled_successfully = signal [:call, message.call_id], message - if fiber - fiber.resume message - else - Celluloid::Logger.debug("spurious response to call #{message.call_id}") + unless handled_successfully + Logger.debug("anomalous message! spurious response to call #{message.call_id}") end else @receivers.handle_message(message) end message @@ -172,24 +214,24 @@ raise exit_event.reason if exit_event.reason end # Handle any exceptions that occur within a running actor def handle_crash(exception) - Celluloid::Logger.crash("#{@subject.class} crashed!", exception) + Logger.crash("#{@subject.class} crashed!", exception) cleanup ExitEvent.new(@proxy, exception) rescue Exception => ex - Celluloid::Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex) + Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex) end # Handle cleaning up this actor after it exits def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event begin @subject.finalize if @subject.respond_to? :finalize rescue Exception => ex - Celluloid::Logger.crash("#{@subject.class}#finalize crashed!", ex) + Logger.crash("#{@subject.class}#finalize crashed!", ex) end end end end