lib/0mq/poll_interruptible.rb in 0mq-0.4.1 vs lib/0mq/poll_interruptible.rb in 0mq-0.5.0

- old
+ new

@@ -15,41 +15,58 @@ # so that they can be shared within the DefaultContext int_endpoint = "inproc://__PollInterruptible_int_"+hash.to_s(26) @int_sock_rep.bind int_endpoint @int_sock_req.connect int_endpoint + # Interruption blocks are stored here by key until #run receives them. + # After each is run, the return value is stored here in its place. + @interruptions = {} + @dead = false super @int_sock_rep, *sockets end - # Same as Poll#run, but will yield [nil, nil] to the block if interrupted + # Same as Poll#run, but will yield [nil, nil] to the block if interrupted. + # Return value may be an empty hash if the poller was killed. def run(&block) raise "#{self} cannot run; it was permanently killed." if @dead super do |socket, revents| if socket == @int_sock_rep - result = socket.recv_array + key, * = socket.recv_array + kill = key == "KILL" + # Call the user block of #interrupt and store the return value + @interruptions[key] = @interruptions[key].call unless kill + + # Call the user block of #run block.call nil, nil if block socket.send_array ["OKAY"] - @int_sock_rep.close if result == ["KILL"] + @int_sock_rep.close if kill else block.call socket, revents if block end end.tap { |hash| hash.delete @int_sock_rep } end # Interrupt the running poll loop, but do not clean up. # This should be run anytime to let the poller re-evaluate state, etc.. # This should only be accessed from a thread other than the poll thread, # and only if the poll thread is running - def interrupt - @int_sock_req.send_string "" - @int_sock_req.recv_array + # If a block is given, it will be executed in the poll thread just + # prior to the execution of the user block passed to {#run}. + def interrupt(&block) + block ||= Proc.new { true } + key = block.object_id.to_s 36 - true + @interruptions[key] = block # Store the block to be called + + @int_sock_req.send_string key # Signal an interruption to #run + @int_sock_req.recv_array # Wait until it has been handled by #run + + @interruptions.delete key # Return the stored result of the block end # Interrupt the running poll loop and permanently kill the Poll object # This should be run once, when the Poll object is no longer needed. # This should only be accessed from a thread other than the poll thread,