celluloid-zmq/lib/celluloid/zmq/reactor.rb in dcell-0.7.1 vs celluloid-zmq/lib/celluloid/zmq/reactor.rb in dcell-0.8.0
- old
+ new
@@ -1,12 +1,17 @@
module Celluloid
module ZMQ
# React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
# to resemble the Reactor design pattern.
class Reactor
- def initialize(waker)
- @waker = waker
+ extend Forwardable
+
+ def_delegator :@waker, :signal, :wakeup
+ def_delegator :@waker, :cleanup, :shutdown
+
+ def initialize
+ @waker = Waker.new
@poller = ::ZMQ::Poller.new
@readers = {}
@writers = {}
rc = @poller.register @waker.socket, ::ZMQ::POLLIN
@@ -28,17 +33,16 @@
# Monitor the given ZMQ socket with the given options
def monitor_zmq(socket, set, type)
if set.has_key? socket
raise ArgumentError, "another method is already waiting on #{socket.inspect}"
else
- set[socket] = Fiber.current
+ set[socket] = Task.current
end
@poller.register socket, type
- Fiber.yield
- @poller.deregister socket, type
+ Task.suspend :zmqwait
socket
end
# Run the reactor, waiting for events, and calling the given block if
# the reactor is awoken by the waker
@@ -55,19 +59,31 @@
raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
end
@poller.readables.each do |sock|
if sock == @waker.socket
- yield
+ @waker.wait
else
- fiber = @readers.delete sock
- fiber.resume if fiber
+ task = @readers.delete sock
+ @poller.deregister sock, ::ZMQ::POLLIN
+
+ if task
+ task.resume
+ else
+ Celluloid::Logger.debug "ZMQ error: got read event without associated reader"
+ end
end
end
@poller.writables.each do |sock|
- fiber = @writers.delete sock
- fiber.resume if fiber
+ task = @writers.delete sock
+ @poller.deregister sock, ::ZMQ::POLLOUT
+
+ if task
+ task.resume
+ else
+ Celluloid::Logger.debug "ZMQ error: got write event without associated reader"
+ end
end
end
end
end
end