Sha256: 2a62a1bf30743d4aec64510c7c55118afecb29fbe084f99d25c330827efef9c5

Contents?: true

Size: 1.98 KB

Versions: 3

Compression:

Stored size: 1.98 KB

Contents

module Celluloid
  module ZMQ
    # React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
    # to resemble the Reactor design pattern.
    class Reactor
      def initialize(waker)
        @waker = waker
        @poller = ::ZMQ::Poller.new
        @readers = {}
        @writers = {}

        rc = @poller.register @waker.socket, ::ZMQ::POLLIN
        unless ::ZMQ::Util.resultcode_ok? rc
          raise "0MQ poll error: #{::ZMQ::Util.error_string}"
        end
      end

      # Wait for the given ZMQ socket to become readable
      def wait_readable(socket)
        monitor_zmq socket, @readers, ::ZMQ::POLLIN
      end

      # Wait for the given ZMQ socket to become writeable
      def wait_writeable(socket)
        monitor_zmq socket, @writers, ::ZMQ::POLLOUT
      end

      # Monitor the given ZMQ socket with the given options
      def monitor_zmq(socket, set, type)
        if set.has_key? socket
          raise ArgumentError, "another method is already waiting on #{socket.inspect}"
        else
          set[socket] = Fiber.current
        end

        @poller.register socket, type
        Fiber.yield

        @poller.deregister socket, type
        socket
      end

      # Run the reactor, waiting for events, and calling the given block if
      # the reactor is awoken by the waker
      def run_once(timeout = nil)
        if timeout
          timeout *= 1000 # Poller uses millisecond increments
        else
          timeout = :blocking
        end

        rc = @poller.poll(timeout)

        unless ::ZMQ::Util.resultcode_ok? rc
          raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
        end

        @poller.readables.each do |sock|
          if sock == @waker.socket
            yield
          else
            fiber = @readers.delete sock
            fiber.resume if fiber
          end
        end

        @poller.writables.each do |sock|
          fiber = @writers.delete sock
          fiber.resume if fiber
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 2 rubygems

Version Path
dcell-0.7.1 celluloid-zmq/lib/celluloid/zmq/reactor.rb
celluloid-zmq-0.7.0 lib/celluloid/zmq/reactor.rb
celluloid-zmq-0.0.4 lib/celluloid/zmq/reactor.rb