Sha256: 2a62a1bf30743d4aec64510c7c55118afecb29fbe084f99d25c330827efef9c5
Contents?: true
Size: 1.98 KB
Versions: 3
Compression:
Stored size: 1.98 KB
Contents
module Celluloid module ZMQ # React to incoming 0MQ and Celluloid events. This is kinda sorta supposed # to resemble the Reactor design pattern. class Reactor def initialize(waker) @waker = waker @poller = ::ZMQ::Poller.new @readers = {} @writers = {} rc = @poller.register @waker.socket, ::ZMQ::POLLIN unless ::ZMQ::Util.resultcode_ok? rc raise "0MQ poll error: #{::ZMQ::Util.error_string}" end end # Wait for the given ZMQ socket to become readable def wait_readable(socket) monitor_zmq socket, @readers, ::ZMQ::POLLIN end # Wait for the given ZMQ socket to become writeable def wait_writeable(socket) monitor_zmq socket, @writers, ::ZMQ::POLLOUT end # Monitor the given ZMQ socket with the given options def monitor_zmq(socket, set, type) if set.has_key? socket raise ArgumentError, "another method is already waiting on #{socket.inspect}" else set[socket] = Fiber.current end @poller.register socket, type Fiber.yield @poller.deregister socket, type socket end # Run the reactor, waiting for events, and calling the given block if # the reactor is awoken by the waker def run_once(timeout = nil) if timeout timeout *= 1000 # Poller uses millisecond increments else timeout = :blocking end rc = @poller.poll(timeout) unless ::ZMQ::Util.resultcode_ok? rc raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}" end @poller.readables.each do |sock| if sock == @waker.socket yield else fiber = @readers.delete sock fiber.resume if fiber end end @poller.writables.each do |sock| fiber = @writers.delete sock fiber.resume if fiber end end end end end
Version data entries
3 entries across 3 versions & 2 rubygems
Version | Path |
---|---|
dcell-0.7.1 | celluloid-zmq/lib/celluloid/zmq/reactor.rb |
celluloid-zmq-0.7.0 | lib/celluloid/zmq/reactor.rb |
celluloid-zmq-0.0.4 | lib/celluloid/zmq/reactor.rb |