lib/celluloid/io/mailbox.rb in celluloid-io-0.7.0 vs lib/celluloid/io/mailbox.rb in celluloid-io-0.8.0
- old
+ new
@@ -1,72 +1,69 @@
module Celluloid
module IO
- # An alternative implementation of Celluloid::Mailbox using Wakers
+ # An alternative implementation of Celluloid::Mailbox using Reactor
class Mailbox < Celluloid::Mailbox
- attr_reader :reactor, :waker
+ attr_reader :reactor
def initialize
@messages = []
@lock = Mutex.new
- @waker = Waker.new
- @reactor = Reactor.new(@waker)
+ @reactor = Reactor.new
end
# Add a message to the Mailbox
def <<(message)
@lock.synchronize do
@messages << message
- @waker.signal
+ @reactor.wakeup
end
nil
- rescue DeadWakerError
+ rescue IOError
raise MailboxError, "dead recipient"
end
# Add a high-priority system event to the Mailbox
def system_event(event)
@lock.synchronize do
@messages.unshift event
-
+
begin
- @waker.signal
- rescue DeadWakerError
+ @reactor.wakeup
+ rescue IOError
# Silently fail if messages are sent to dead actors
end
end
nil
end
# Receive a message from the Mailbox
def receive(timeout = nil, &block)
- message = nil
+ message = next_message(&block)
- begin
+ until message
if timeout
- now = Time.now
- wait_until ||= now + timeout
- wait_interval = wait_until - now
- return if wait_interval < 0
- else
- wait_interval = nil
- end
-
- @reactor.run_once(wait_interval) do
- @waker.wait
- message = next_message(&block)
+ now = Time.now
+ wait_until ||= now + timeout
+ wait_interval = wait_until - now
+ return if wait_interval < 0
+ else
+ wait_interval = nil
end
- end until message
+
+ @reactor.run_once(wait_interval)
+ message = next_message(&block)
+ end
message
- rescue IOError, DeadWakerError
+ rescue IOError
shutdown # force shutdown of the mailbox
raise MailboxShutdown, "mailbox shutdown called during receive"
end
# Cleanup any IO objects this Mailbox may be using
def shutdown
- @waker.cleanup
+ @reactor.shutdown
super
end
end
end
-end
\ No newline at end of file
+end