lib/celluloid/io/mailbox.rb in celluloid-io-0.7.0 vs lib/celluloid/io/mailbox.rb in celluloid-io-0.8.0

- old
+ new

@@ -1,72 +1,69 @@ module Celluloid module IO - # An alternative implementation of Celluloid::Mailbox using Wakers + # An alternative implementation of Celluloid::Mailbox using Reactor class Mailbox < Celluloid::Mailbox - attr_reader :reactor, :waker + attr_reader :reactor def initialize @messages = [] @lock = Mutex.new - @waker = Waker.new - @reactor = Reactor.new(@waker) + @reactor = Reactor.new end # Add a message to the Mailbox def <<(message) @lock.synchronize do @messages << message - @waker.signal + @reactor.wakeup end nil - rescue DeadWakerError + rescue IOError raise MailboxError, "dead recipient" end # Add a high-priority system event to the Mailbox def system_event(event) @lock.synchronize do @messages.unshift event - + begin - @waker.signal - rescue DeadWakerError + @reactor.wakeup + rescue IOError # Silently fail if messages are sent to dead actors end end nil end # Receive a message from the Mailbox def receive(timeout = nil, &block) - message = nil + message = next_message(&block) - begin + until message if timeout - now = Time.now - wait_until ||= now + timeout - wait_interval = wait_until - now - return if wait_interval < 0 - else - wait_interval = nil - end - - @reactor.run_once(wait_interval) do - @waker.wait - message = next_message(&block) + now = Time.now + wait_until ||= now + timeout + wait_interval = wait_until - now + return if wait_interval < 0 + else + wait_interval = nil end - end until message + + @reactor.run_once(wait_interval) + message = next_message(&block) + end message - rescue IOError, DeadWakerError + rescue IOError shutdown # force shutdown of the mailbox raise MailboxShutdown, "mailbox shutdown called during receive" end # Cleanup any IO objects this Mailbox may be using def shutdown - @waker.cleanup + @reactor.shutdown super end end end -end \ No newline at end of file +end