Sha256: 1b14439fc4a3e18921ea43eca0375333dc95b917f3f0e54711c48d97d67913c5

Contents?: true

Size: 1.6 KB

Versions: 3

Compression:

Stored size: 1.6 KB

Contents

module Celluloid
  module ZMQ
    # You can't wake the dead
    DeadWakerError = Class.new IOError

    # Wakes up sleepy threads so that they can check their mailbox
    # Works like a ConditionVariable, except it's implemented as a ZMQ socket
    # so that it can be multiplexed alongside other ZMQ sockets
    class Waker
      PAYLOAD = "\0" # the payload doesn't matter, it's just a signal

      def initialize
        @sender   = ZMQ.context.socket(::ZMQ::PAIR)
        @receiver = ZMQ.context.socket(::ZMQ::PAIR)

        @addr = "inproc://waker-#{object_id}"
        @sender.bind @addr
        @receiver.connect @addr

        @sender_lock = Mutex.new
      end

      # Wakes up the thread that is waiting for this Waker
      def signal
        @sender_lock.synchronize do
          unless ::ZMQ::Util.resultcode_ok? @sender.send_string PAYLOAD
            raise DeadWakerError, "error sending 0MQ message: #{::ZMQ::Util.error_string}"
          end
        end
      end

      # 0MQ socket to wait for messages on
      def socket
        @receiver
      end

      # Wait for another thread to signal this Waker
      def wait
        message = ''
        rc = @receiver.recv_string message

        unless ::ZMQ::Util.resultcode_ok? rc and message == PAYLOAD
          raise DeadWakerError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
        end
      end

      # Clean up the IO objects associated with this waker
      def cleanup
        @sender_lock.synchronize { @sender.close rescue nil }
        @receiver.close rescue nil
        nil
      end
      alias_method :shutdown, :cleanup
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
celluloid-zmq-0.16.1 lib/celluloid/zmq/waker.rb
celluloid-zmq-0.16.0 lib/celluloid/zmq/waker.rb
celluloid-zmq-0.16.0.pre lib/celluloid/zmq/waker.rb