lib/0mq/poll_interruptible.rb in 0mq-0.5.2 vs lib/0mq/poll_interruptible.rb in 0mq-0.5.3

- old
+ new

@@ -6,86 +6,117 @@ # Creates the additional interruption objects and calls super # Note that either #kill or #close MUST be called when done with the object. # There is no automatic finalizer for this object. def initialize(*sockets) - @int_sock_rep = ZMQ::Socket.new ZMQ::REP - @int_sock_req = ZMQ::Socket.new ZMQ::REQ + @int_sock_rep = ZMQ::Socket.new ZMQ::REP + @int_sock_req = ZMQ::Socket.new ZMQ::REQ + @int_sock_push = ZMQ::Socket.new ZMQ::PUSH + @int_sock_pull = ZMQ::Socket.new ZMQ::PULL # Choose an endpoint name that we can expect to be unique # so that they can be shared within the DefaultContext int_endpoint = "inproc://__PollInterruptible_int_"+hash.to_s(26) - @int_sock_rep.bind int_endpoint - @int_sock_req.connect int_endpoint + @int_sock_rep.bind int_endpoint+'R' + @int_sock_req.connect int_endpoint+'R' + @int_lock_req = Mutex.new + @int_sock_pull.bind int_endpoint+'P' + @int_sock_push.connect int_endpoint+'P' + @int_lock_push = Mutex.new + # Interruption blocks are stored here by key until #run receives them. # After each is run, the return value is stored here in its place. @interruptions = {} @dead = false - super @int_sock_rep, *sockets + super @int_sock_rep, @int_sock_pull, *sockets end # Same as Poll#run, but will yield [nil, nil] to the block if interrupted. # Return value may be an empty hash if the poller was killed. def run(&block) raise "#{self} cannot run; it was permanently killed." if @dead super do |socket, revents| - if socket == @int_sock_rep + if socket == @int_sock_rep || socket == @int_sock_pull key, * = socket.recv_array kill = key == "KILL" + blocking = socket == @int_sock_rep # Call the user block of #interrupt and store the return value - @interruptions[key] = @interruptions[key].call unless kill + unless kill + result = @interruptions[key].call + blocking ? @interruptions[key] = result : @interruptions.delete(key) + end # Call the user block of #run block.call nil, nil if block - socket.send_array ["OKAY"] + # Send a response if the interruption was blocking + socket.send_array ["OKAY"] if blocking if kill @int_sock_rep.close + @int_sock_pull.close @dead = true end else block.call socket, revents if block end - end.tap { |hash| hash.delete @int_sock_rep } + end.tap do |hash| + hash.delete @int_sock_rep + hash.delete @int_sock_pull + end end # Interrupt the running poll loop, but do not clean up. # This should be run anytime to let the poller re-evaluate state, etc.. # This should only be accessed from a thread other than the poll thread, # and only if the poll thread is running # If a block is given, it will be executed in the poll thread just # prior to the execution of the user block passed to {#run}. - def interrupt(&block) + # If the blocking: false option is is given, the call will not wait for + # the interruption to be processed. + def interrupt(opts={}, &block) + blocking = opts.fetch :blocking, true block ||= Proc.new { true } + # block = block.dup key = block.object_id.to_s 36 @interruptions[key] = block # Store the block to be called - @int_sock_req.send_string key # Signal an interruption to #run - @int_sock_req.recv_array # Wait until it has been handled by #run - - @interruptions.delete key # Return the stored result of the block + if blocking + @int_lock_req.synchronize { + @int_sock_req.send_string key # Signal an interruption to #run + @int_sock_req.recv_array # Wait until it has been handled by #run + } + return @interruptions.delete key # Return the stored result of the block + else + @int_lock_push.synchronize { + @int_sock_push.send_string key # Signal an interruption to #run + } + return nil + end end # Interrupt the running poll loop and permanently kill the Poll object # This should be run once, when the Poll object is no longer needed. # This should only be accessed from a thread other than the poll thread, # and only if the poll thread is running # Use #cleanup instead when there is no poll loop thread running. def kill return nil if @dead - @int_sock_req.send_array ["KILL"] - @int_sock_req.recv_array + @int_lock_req.synchronize do + @int_sock_req.send_array ["KILL"] + @int_sock_req.recv_array + end @int_sock_req.close + @int_sock_push.close true end # Permanently kill the Poll object @@ -95,9 +126,11 @@ def close return nil if @dead @int_sock_rep.close @int_sock_req.close + @int_sock_pull.close + @int_sock_push.close @dead = true end # Return true if the object has been killed or closed and cannot be run