lib/celluloid/zmq/reactor.rb in celluloid-zmq-0.17.0 vs lib/celluloid/zmq/reactor.rb in celluloid-zmq-0.17.2
- old
+ new
@@ -1,11 +1,10 @@
module Celluloid
module ZMQ
# React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
# to resemble the Reactor design pattern.
class Reactor
-
extend Forwardable
def_delegator :@waker, :signal, :wakeup
def_delegator :@waker, :cleanup, :shutdown
def_delegator ZMQ, :result_ok?
@@ -14,13 +13,11 @@
@poller = ::ZMQ::Poller.new
@readers = {}
@writers = {}
rc = @poller.register @waker.socket, ::ZMQ::POLLIN
- unless result_ok? rc
- raise "0MQ poll error: #{::ZMQ::Util.error_string}"
- end
+ fail "0MQ poll error: #{::ZMQ::Util.error_string}" unless result_ok? rc
end
# Wait for the given ZMQ socket to become readable
def wait_readable(socket)
monitor_zmq socket, @readers, ::ZMQ::POLLIN
@@ -31,12 +28,12 @@
monitor_zmq socket, @writers, ::ZMQ::POLLOUT
end
# Monitor the given ZMQ socket with the given options
def monitor_zmq(socket, set, type)
- if set.has_key? socket
- raise ArgumentError, "another method is already waiting on #{socket.inspect}"
+ if set.key? socket
+ fail ArgumentError, "another method is already waiting on #{socket.inspect}"
else
set[socket] = Task.current
end
@poller.register socket, type
@@ -55,10 +52,10 @@
end
rc = @poller.poll(timeout)
unless result_ok? rc
- raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
+ fail IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
end
@poller.readables.each do |sock|
if sock == @waker.socket
@waker.wait