module ZMQ # Recommended to use the default for +io_threads+ # since most programs will not saturate I/O. # # The rule of thumb is to make +io_threads+ equal to the number # gigabits per second that the application will produce. # # The +io_threads+ number specifies the size of the thread pool # allocated by 0mq for processing incoming/outgoing messages. # # Returns a context object when allocation succeeds. It's necessary # for passing to the # #Socket constructor when allocating new sockets. All sockets # live within a context. # # Also, Sockets should *only* be accessed from the thread where they # were first created. Do *not* pass sockets between threads; pass # in the context and allocate a new socket per thread. If you must # use threads, then make sure to execute a full memory barrier (e.g. # mutex) as you pass a socket from one thread to the next. # # To connect sockets between contexts, use +inproc+ or +ipc+ # transport and set up a 0mq socket between them. This is also the # recommended technique for allowing sockets to communicate between # threads. # # context = ZMQ::Context.create # if context # socket = context.socket(ZMQ::REQ) # if socket # ... # else # STDERR.puts "Socket allocation failed" # end # else # STDERR.puts "Context allocation failed" # end # # class Context attr_reader :context, :io_threads, :max_sockets alias :pointer :context # Use the factory method Context#create to make contexts. # def self.create(opts = {}) new(opts) rescue nil end def initialize(opts = {}) if opts.respond_to?(:empty?) @io_threads = opts[:io_threads] || IO_THREADS_DFLT @max_sockets = opts[:max_sockets] || MAX_SOCKETS_DFLT else @io_threads = opts || 1 @max_sockets = MAX_SOCKETS_DFLT end @context = LibZMQ.zmq_ctx_new ZMQ::Util.error_check 'zmq_ctx_new', (@context.nil? || @context.null?) ? -1 : 0 rc = LibZMQ.zmq_ctx_set(@context, ZMQ::IO_THREADS, @io_threads) ZMQ::Util.error_check 'zmq_ctx_set', rc rc = LibZMQ.zmq_ctx_set(@context, ZMQ::MAX_SOCKETS, @max_sockets) ZMQ::Util.error_check 'zmq_ctx_set', rc define_finalizer end # Call to release the context and any remaining data associated # with past sockets. This will close any sockets that remain # open; further calls to those sockets will return -1 to indicate # the operation failed. # # Returns 0 for success, -1 for failure. # def terminate unless @context.nil? || @context.null? remove_finalizer rc = LibZMQ.terminate_context(@context) @context = nil rc || 0 else 0 end end # Short-cut to allocate a socket for a specific context. # # Takes several +type+ values: # #ZMQ::REQ # #ZMQ::REP # #ZMQ::PUB # #ZMQ::SUB # #ZMQ::PAIR # #ZMQ::PULL # #ZMQ::PUSH # #ZMQ::DEALER # #ZMQ::ROUTER # # Returns a #ZMQ::Socket when the allocation succeeds, nil # if it fails. # def socket type sock = nil begin sock = Socket.new @context, type rescue ContextError => e sock = nil end sock end private def define_finalizer ObjectSpace.define_finalizer(self, self.class.close(@context, Process.pid)) end def remove_finalizer ObjectSpace.undefine_finalizer self end def self.close context, pid Proc.new { LibZMQ.zmq_ctx_term context if !context.null? && Process.pid == pid } end end end # module ZMQ