lib/0mq/context.rb in 0mq-0.5.1 vs lib/0mq/context.rb in 0mq-0.5.2
- old
+ new
@@ -7,21 +7,70 @@
# The FFI pointer to the context.
attr_reader :pointer
def initialize
+ # FFI socket pointer for this context
@pointer = LibZMQ.zmq_ctx_new
+
+ # List of FFI socket pointers associated with this context.
+ # Each Socket is responsible for registering and unregistering
+ # its pointer from the Context it is associated with.
+ # See #register_socket_pointer and #unregister_socket_pointer,
+ # as well as #terminate and self.finalizer (where they get closed)
+ @socket_pointers = Array.new
+ @socket_pointers_mutex = Mutex.new
+
+ ObjectSpace.define_finalizer self,
+ self.class.finalizer(@pointer, @socket_pointers, Process.pid)
end
+ # @api private
+ def register_socket_pointer pointer
+ @socket_pointers_mutex.synchronize do
+ @socket_pointers.push pointer
+ end
+ end
+
+ # @api private
+ def unregister_socket_pointer pointer
+ @socket_pointers_mutex.synchronize do
+ @socket_pointers.delete pointer
+ end
+ end
+
# Destroy the ØMQ context.
def terminate
if @pointer
- rc = LibZMQ.version4? ?
- LibZMQ.zmq_ctx_term(@pointer) :
- LibZMQ.zmq_term(@pointer)
- ZMQ.error_check true if rc == -1
+ ObjectSpace.undefine_finalizer self
+ rc = LibZMQ.respond_to?(:zmq_ctx_term) ?
+ LibZMQ.zmq_ctx_term(pointer) :
+ LibZMQ.zmq_term(pointer)
+ ZMQ.error_check true if rc==-1
+
@pointer = nil
+ end
+ end
+
+ # Create a safe finalizer for the context pointer to terminate on GC
+ def self.finalizer(pointer, socket_pointers, pid)
+ Proc.new do
+ if Process.pid == pid
+ # Close all socket pointers associated with this context.
+ #
+ # If any of these sockets are still open when zmq_ctx_term is called,
+ # the call will hang. This is problematic, as the execution of
+ # finalizers is not multithreaded, and the order of finalizers is not
+ # guaranteed. Even when the Sockets each hold a reference to the
+ # Context, the Context could still be GCed first, causing lockup.
+ socket_pointers.each { |ptr| LibZMQ.zmq_close ptr }
+ socket_pointers.clear
+
+ LibZMQ.respond_to?(:zmq_ctx_term) ?
+ LibZMQ.zmq_ctx_term(pointer) :
+ LibZMQ.zmq_term(pointer)
+ end
end
end
# Create a Socket within this context.
def socket(type, opts={})