lib/celluloid/zmq/reactor.rb in celluloid-zmq-0.0.1 vs lib/celluloid/zmq/reactor.rb in celluloid-zmq-0.0.2

- old
+ new

@@ -7,18 +7,14 @@ @waker = waker @poller = ::ZMQ::Poller.new @readers = {} @writers = {} - # FIXME: The way things are presently implemented is super ghetto - # The ZMQ::Poller should be able to wait on the waker somehow - # but I can't get it to work :( - #result = @poller.register(nil, ::ZMQ::POLLIN, @waker.io.fileno) - # - #unless ::ZMQ::Util.resultcode_ok?(result) - # raise "couldn't register waker with 0MQ poller" - #end + rc = @poller.register @waker.socket, ::ZMQ::POLLIN + unless ::ZMQ::Util.resultcode_ok? rc + raise "0MQ poll error: #{::ZMQ::Util.error_string}" + end end # Wait for the given ZMQ socket to become readable def wait_readable(socket) monitor_zmq socket, @readers, ::ZMQ::POLLIN @@ -45,29 +41,28 @@ end # Run the reactor, waiting for events, and calling the given block if # the reactor is awoken by the waker def run_once(timeout = nil) - # FIXME: This approach is super ghetto. Find some way to make the - # ZMQ::Poller wait on the waker's file descriptor - if @poller.size == 0 - readable, _ = select [@waker.io], [], [], timeout - yield if readable and readable.include? @waker.io - else - if ::ZMQ::Util.resultcode_ok? @poller.poll(100) - @poller.readables.each do |sock| - fiber = @readers.delete sock - fiber.resume if fiber - end + timeout ||= :blocking + rc = @poller.poll(timeout) - @poller.writables.each do |sock| - fiber = @writers.delete sock - fiber.resume if fiber - end + unless ::ZMQ::Util.resultcode_ok? rc + raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}" + end + + @poller.readables.each do |sock| + if sock == @waker.socket + yield + else + fiber = @readers.delete sock + fiber.resume if fiber end + end - readable, _ = select [@waker.io], [], [], 0 - yield if readable and readable.include? @waker.io + @poller.writables.each do |sock| + fiber = @writers.delete sock + fiber.resume if fiber end end end end end