Sha256: d5fc0800b4a27d90f60b77555621abf1a2ff660458403330e9f738b8e84ba75d
Contents?: true
Size: 1.63 KB
Versions: 1
Compression:
Stored size: 1.63 KB
Contents
module Celluloid module ZMQ # You can't wake the dead DeadWakerError = Class.new IOError # Wakes up sleepy threads so that they can check their mailbox # Works like a ConditionVariable, except it's implemented as a ZMQ socket # so that it can be multiplexed alongside other ZMQ sockets class Waker extend Forwardable def_delegator ZMQ, :result_ok? PAYLOAD = "\0" # the payload doesn't matter, it's just a signal def initialize @sender = ZMQ.context.socket(::ZMQ::PAIR) @receiver = ZMQ.context.socket(::ZMQ::PAIR) @addr = "inproc://waker-#{object_id}" @sender.bind @addr @receiver.connect @addr @sender_lock = Mutex.new end # Wakes up the thread that is waiting for this Waker def signal @sender_lock.synchronize do unless result_ok? @sender.send_string PAYLOAD raise DeadWakerError, "error sending 0MQ message: #{::ZMQ::Util.error_string}" end end end # 0MQ socket to wait for messages on def socket @receiver end # Wait for another thread to signal this Waker def wait message = '' rc = @receiver.recv_string message unless result_ok? rc and message == PAYLOAD raise DeadWakerError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}" end end # Clean up the IO objects associated with this waker def cleanup @sender_lock.synchronize { @sender.close rescue nil } @receiver.close rescue nil nil end alias_method :shutdown, :cleanup end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
celluloid-zmq-0.17.0 | lib/celluloid/zmq/waker.rb |