lib/0mq/poll_interruptible.rb in 0mq-0.4.1 vs lib/0mq/poll_interruptible.rb in 0mq-0.5.0
- old
+ new
@@ -15,41 +15,58 @@
# so that they can be shared within the DefaultContext
int_endpoint = "inproc://__PollInterruptible_int_"+hash.to_s(26)
@int_sock_rep.bind int_endpoint
@int_sock_req.connect int_endpoint
+ # Interruption blocks are stored here by key until #run receives them.
+ # After each is run, the return value is stored here in its place.
+ @interruptions = {}
+
@dead = false
super @int_sock_rep, *sockets
end
- # Same as Poll#run, but will yield [nil, nil] to the block if interrupted
+ # Same as Poll#run, but will yield [nil, nil] to the block if interrupted.
+ # Return value may be an empty hash if the poller was killed.
def run(&block)
raise "#{self} cannot run; it was permanently killed." if @dead
super do |socket, revents|
if socket == @int_sock_rep
- result = socket.recv_array
+ key, * = socket.recv_array
+ kill = key == "KILL"
+ # Call the user block of #interrupt and store the return value
+ @interruptions[key] = @interruptions[key].call unless kill
+
+ # Call the user block of #run
block.call nil, nil if block
socket.send_array ["OKAY"]
- @int_sock_rep.close if result == ["KILL"]
+ @int_sock_rep.close if kill
else
block.call socket, revents if block
end
end.tap { |hash| hash.delete @int_sock_rep }
end
# Interrupt the running poll loop, but do not clean up.
# This should be run anytime to let the poller re-evaluate state, etc..
# This should only be accessed from a thread other than the poll thread,
# and only if the poll thread is running
- def interrupt
- @int_sock_req.send_string ""
- @int_sock_req.recv_array
+ # If a block is given, it will be executed in the poll thread just
+ # prior to the execution of the user block passed to {#run}.
+ def interrupt(&block)
+ block ||= Proc.new { true }
+ key = block.object_id.to_s 36
- true
+ @interruptions[key] = block # Store the block to be called
+
+ @int_sock_req.send_string key # Signal an interruption to #run
+ @int_sock_req.recv_array # Wait until it has been handled by #run
+
+ @interruptions.delete key # Return the stored result of the block
end
# Interrupt the running poll loop and permanently kill the Poll object
# This should be run once, when the Poll object is no longer needed.
# This should only be accessed from a thread other than the poll thread,