Sha256: e189870287b045657d8610e23198baa5dcde191cf21657c491c13a4ca3954d7e
Contents?: true
Size: 1.9 KB
Versions: 4
Compression:
Stored size: 1.9 KB
Contents
require 'nio' module Celluloid module IO # React to external I/O events. This is kinda sorta supposed to resemble the # Reactor design pattern. class Reactor extend Forwardable # Unblock the reactor (i.e. to signal it from another thread) def_delegator :@selector, :wakeup # Terminate the reactor def_delegator :@selector, :close, :shutdown def initialize @selector = NIO::Selector.new end # Wait for the given IO object to become readable def wait_readable(io) wait io, :r end # Wait for the given IO object to become writable def wait_writable(io) wait io, :w end # Wait for the given IO operation to complete def wait(io, set) # zomg ugly type conversion :( unless io.is_a?(::IO) or io.is_a?(OpenSSL::SSL::SSLSocket) if io.respond_to? :to_io io = io.to_io elsif ::IO.respond_to? :try_convert io = ::IO.try_convert(io) end raise TypeError, "can't convert #{io.class} into IO" unless io.is_a?(::IO) end monitor = @selector.register(io, set) monitor.value = Task.current begin Task.suspend :iowait ensure # In all cases we want to ensure that the monitor is closed once we # have woken up. However, in some cases, the monitor is already # invalid, e.g. in the case that we are terminating. We catch this # case explicitly. monitor.close end end # Run the reactor, waiting for events or wakeup signal def run_once(timeout = nil) @selector.select(timeout) do |monitor| task = monitor.value if task.running? task.resume else Logger.warn("reactor attempted to resume a dead task") end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems