Sha256: ea9c68c3095065776a90a146c478f62b0d6b6f397c59bc6b11a85976d6ebdc00

Contents?: true

Size: 1.58 KB

Versions: 2

Compression:

Stored size: 1.58 KB

Contents

module Celluloid
  module ZMQ
    class DeadWakerError < Celluloid::IO::DeadWakerError; end # You can't wake the dead

    # Wakes up sleepy threads so that they can check their mailbox
    # Works like a ConditionVariable, except it's implemented as a ZMQ socket
    # so that it can be multiplexed alongside other ZMQ sockets
    class Waker
      PAYLOAD = "\0" # the payload doesn't matter, it's just a signal

      def initialize
        @sender   = ZMQ.context.socket(::ZMQ::PAIR)
        @receiver = ZMQ.context.socket(::ZMQ::PAIR)

        @addr = "inproc://waker-#{object_id}"
        @sender.bind @addr
        @receiver.connect @addr

        @sender_lock = Mutex.new
      end

      # Wakes up the thread that is waiting for this Waker
      def signal
        @sender_lock.synchronize do
          unless ::ZMQ::Util.resultcode_ok? @sender.send_string PAYLOAD
            raise DeadWakerError, "error sending 0MQ message: #{::ZMQ::Util.error_string}"
          end
        end
      end

      # 0MQ socket to wait for messages on
      def socket
        @receiver
      end

      # Wait for another thread to signal this Waker
      def wait
        message = ''
  	    rc = @receiver.recv_string message

  	    unless ::ZMQ::Util.resultcode_ok? rc and message == PAYLOAD
  	      raise DeadWakerError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
  	    end
      end

      # Clean up the IO objects associated with this waker
      def cleanup
        @sender_lock.synchronize { @sender.close rescue nil }
        @receiver.close rescue nil
        nil
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
celluloid-zmq-0.0.3 lib/celluloid/zmq/waker.rb
celluloid-zmq-0.0.2 lib/celluloid/zmq/waker.rb