Sha256: 7e1ff983cc132415b4bcb3d66e11923bd30dfcf85e99f79767f21ddf55b67390
Contents?: true
Size: 1.88 KB
Versions: 2
Compression:
Stored size: 1.88 KB
Contents
module Celluloid module ZMQ # React to incoming 0MQ and Celluloid events. This is kinda sorta supposed # to resemble the Reactor design pattern. class Reactor def initialize(waker) @waker = waker @poller = ::ZMQ::Poller.new @readers = {} @writers = {} rc = @poller.register @waker.socket, ::ZMQ::POLLIN unless ::ZMQ::Util.resultcode_ok? rc raise "0MQ poll error: #{::ZMQ::Util.error_string}" end end # Wait for the given ZMQ socket to become readable def wait_readable(socket) monitor_zmq socket, @readers, ::ZMQ::POLLIN end # Wait for the given ZMQ socket to become writeable def wait_writeable(socket) monitor_zmq socket, @writers, ::ZMQ::POLLOUT end # Monitor the given ZMQ socket with the given options def monitor_zmq(socket, set, type) if set.has_key? socket raise ArgumentError, "another method is already waiting on #{socket.inspect}" else set[socket] = Fiber.current end @poller.register socket, type Fiber.yield @poller.deregister socket, type socket end # Run the reactor, waiting for events, and calling the given block if # the reactor is awoken by the waker def run_once(timeout = nil) timeout ||= :blocking rc = @poller.poll(timeout) unless ::ZMQ::Util.resultcode_ok? rc raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}" end @poller.readables.each do |sock| if sock == @waker.socket yield else fiber = @readers.delete sock fiber.resume if fiber end end @poller.writables.each do |sock| fiber = @writers.delete sock fiber.resume if fiber end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
celluloid-zmq-0.0.3 | lib/celluloid/zmq/reactor.rb |
celluloid-zmq-0.0.2 | lib/celluloid/zmq/reactor.rb |