lib/celluloid/zmq/reactor.rb in celluloid-zmq-0.0.1 vs lib/celluloid/zmq/reactor.rb in celluloid-zmq-0.0.2
- old
+ new
@@ -7,18 +7,14 @@
@waker = waker
@poller = ::ZMQ::Poller.new
@readers = {}
@writers = {}
- # FIXME: The way things are presently implemented is super ghetto
- # The ZMQ::Poller should be able to wait on the waker somehow
- # but I can't get it to work :(
- #result = @poller.register(nil, ::ZMQ::POLLIN, @waker.io.fileno)
- #
- #unless ::ZMQ::Util.resultcode_ok?(result)
- # raise "couldn't register waker with 0MQ poller"
- #end
+ rc = @poller.register @waker.socket, ::ZMQ::POLLIN
+ unless ::ZMQ::Util.resultcode_ok? rc
+ raise "0MQ poll error: #{::ZMQ::Util.error_string}"
+ end
end
# Wait for the given ZMQ socket to become readable
def wait_readable(socket)
monitor_zmq socket, @readers, ::ZMQ::POLLIN
@@ -45,29 +41,28 @@
end
# Run the reactor, waiting for events, and calling the given block if
# the reactor is awoken by the waker
def run_once(timeout = nil)
- # FIXME: This approach is super ghetto. Find some way to make the
- # ZMQ::Poller wait on the waker's file descriptor
- if @poller.size == 0
- readable, _ = select [@waker.io], [], [], timeout
- yield if readable and readable.include? @waker.io
- else
- if ::ZMQ::Util.resultcode_ok? @poller.poll(100)
- @poller.readables.each do |sock|
- fiber = @readers.delete sock
- fiber.resume if fiber
- end
+ timeout ||= :blocking
+ rc = @poller.poll(timeout)
- @poller.writables.each do |sock|
- fiber = @writers.delete sock
- fiber.resume if fiber
- end
+ unless ::ZMQ::Util.resultcode_ok? rc
+ raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
+ end
+
+ @poller.readables.each do |sock|
+ if sock == @waker.socket
+ yield
+ else
+ fiber = @readers.delete sock
+ fiber.resume if fiber
end
+ end
- readable, _ = select [@waker.io], [], [], 0
- yield if readable and readable.include? @waker.io
+ @poller.writables.each do |sock|
+ fiber = @writers.delete sock
+ fiber.resume if fiber
end
end
end
end
end