Sha256: 7e1ff983cc132415b4bcb3d66e11923bd30dfcf85e99f79767f21ddf55b67390

Contents?: true

Size: 1.88 KB

Versions: 2

Compression:

Stored size: 1.88 KB

Contents

module Celluloid
  module ZMQ
    # React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
    # to resemble the Reactor design pattern.
    class Reactor
      def initialize(waker)
        @waker = waker
        @poller = ::ZMQ::Poller.new
        @readers = {}
        @writers = {}

        rc = @poller.register @waker.socket, ::ZMQ::POLLIN
        unless ::ZMQ::Util.resultcode_ok? rc
          raise "0MQ poll error: #{::ZMQ::Util.error_string}"
        end
      end

      # Wait for the given ZMQ socket to become readable
      def wait_readable(socket)
        monitor_zmq socket, @readers, ::ZMQ::POLLIN
      end

      # Wait for the given ZMQ socket to become writeable
      def wait_writeable(socket)
        monitor_zmq socket, @writers, ::ZMQ::POLLOUT
      end

      # Monitor the given ZMQ socket with the given options
      def monitor_zmq(socket, set, type)
        if set.has_key? socket
          raise ArgumentError, "another method is already waiting on #{socket.inspect}"
        else
          set[socket] = Fiber.current
        end

        @poller.register socket, type
        Fiber.yield

        @poller.deregister socket, type
        socket
      end

      # Run the reactor, waiting for events, and calling the given block if
      # the reactor is awoken by the waker
      def run_once(timeout = nil)
        timeout ||= :blocking
        rc = @poller.poll(timeout)

        unless ::ZMQ::Util.resultcode_ok? rc
          raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
        end

        @poller.readables.each do |sock|
          if sock == @waker.socket
            yield
          else
            fiber = @readers.delete sock
            fiber.resume if fiber
          end
        end

        @poller.writables.each do |sock|
          fiber = @writers.delete sock
          fiber.resume if fiber
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
celluloid-zmq-0.0.3 lib/celluloid/zmq/reactor.rb
celluloid-zmq-0.0.2 lib/celluloid/zmq/reactor.rb