celluloid-zmq/lib/celluloid/zmq/reactor.rb in dcell-0.7.1 vs celluloid-zmq/lib/celluloid/zmq/reactor.rb in dcell-0.8.0

- old
+ new

@@ -1,12 +1,17 @@ module Celluloid module ZMQ # React to incoming 0MQ and Celluloid events. This is kinda sorta supposed # to resemble the Reactor design pattern. class Reactor - def initialize(waker) - @waker = waker + extend Forwardable + + def_delegator :@waker, :signal, :wakeup + def_delegator :@waker, :cleanup, :shutdown + + def initialize + @waker = Waker.new @poller = ::ZMQ::Poller.new @readers = {} @writers = {} rc = @poller.register @waker.socket, ::ZMQ::POLLIN @@ -28,17 +33,16 @@ # Monitor the given ZMQ socket with the given options def monitor_zmq(socket, set, type) if set.has_key? socket raise ArgumentError, "another method is already waiting on #{socket.inspect}" else - set[socket] = Fiber.current + set[socket] = Task.current end @poller.register socket, type - Fiber.yield - @poller.deregister socket, type + Task.suspend :zmqwait socket end # Run the reactor, waiting for events, and calling the given block if # the reactor is awoken by the waker @@ -55,19 +59,31 @@ raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}" end @poller.readables.each do |sock| if sock == @waker.socket - yield + @waker.wait else - fiber = @readers.delete sock - fiber.resume if fiber + task = @readers.delete sock + @poller.deregister sock, ::ZMQ::POLLIN + + if task + task.resume + else + Celluloid::Logger.debug "ZMQ error: got read event without associated reader" + end end end @poller.writables.each do |sock| - fiber = @writers.delete sock - fiber.resume if fiber + task = @writers.delete sock + @poller.deregister sock, ::ZMQ::POLLOUT + + if task + task.resume + else + Celluloid::Logger.debug "ZMQ error: got write event without associated reader" + end end end end end end