lib/ffi-rzmq/socket.rb in ffi-rzmq-0.8.2 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.0

- old
+ new

@@ -1,475 +1,985 @@ module ZMQ - ZMQ_SOCKET_STR = 'zmq_socket'.freeze unless defined? ZMQ_SOCKET_STR - ZMQ_SETSOCKOPT_STR = 'zmq_setsockopt'.freeze - ZMQ_GETSOCKOPT_STR = 'zmq_getsockopt'.freeze - ZMQ_BIND_STR = 'zmq_bind'.freeze - ZMQ_CONNECT_STR = 'zmq_connect'.freeze - ZMQ_CLOSE_STR = 'zmq_close'.freeze - ZMQ_SEND_STR = 'zmq_send'.freeze - ZMQ_RECV_STR = 'zmq_recv'.freeze - - class Socket + module CommonSocketBehavior include ZMQ::Util attr_reader :socket, :name # Allocates a socket of type +type+ for sending and receiving data. # # +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, - # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, + # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, # ZMQ::DEALER or ZMQ::ROUTER. # # By default, this class uses ZMQ::Message for manual # memory management. For automatic garbage collection of received messages, # it is possible to override the :receiver_class to use ZMQ::ManagedMessage. # + # sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage) + # + # Advanced users may want to replace the receiver class with their + # own custom class. The custom class must conform to the same public API + # as ZMQ::Message. + # + # Creation of a new Socket object can return nil when socket creation + # fails. + # + # if (socket = Socket.new(context.pointer, ZMQ::REQ)) + # ... + # else + # STDERR.puts "Socket creation failed" + # end + # + def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message} + new(context_ptr, type, opts) rescue nil + end + + # To avoid rescuing exceptions, use the factory method #create for + # all socket creation. + # + # Allocates a socket of type +type+ for sending and receiving data. + # + # +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, + # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, + # ZMQ::DEALER or ZMQ::ROUTER. + # + # By default, this class uses ZMQ::Message for manual + # memory management. For automatic garbage collection of received messages, + # it is possible to override the :receiver_class to use ZMQ::ManagedMessage. + # # sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage) # # Advanced users may want to replace the receiver class with their # own custom class. The custom class must conform to the same public API # as ZMQ::Message. # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. + # Creation of a new Socket object can raise an exception. This occurs when the + # +context_ptr+ is null or when the allocation of the 0mq socket within the + # context fails. # + # begin + # socket = Socket.new(context.pointer, ZMQ::REQ) + # rescue ContextError => e + # # error handling + # end + # def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message} # users may override the classes used for receiving; class must conform to the # same public API as ZMQ::Message @receiver_klass = opts[:receiver_class] + + context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context) unless context_ptr.null? @socket = LibZMQ.zmq_socket context_ptr, type - if @socket - error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0 + if @socket && !@socket.null? @name = SocketTypeNameMap[type] else - raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Socket pointer was null" + raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null" end else - raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Context pointer was null" + raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null" end @sockopt_cache = {} define_finalizer end # Set the queue options on this socket. # - # Valid +option_name+ values that take a numeric +option_value+ are: + # Valid +name+ values that take a numeric +value+ are: # ZMQ::HWM - # ZMQ::SWAP + # ZMQ::SWAP (version 2 only) # ZMQ::AFFINITY # ZMQ::RATE # ZMQ::RECOVERY_IVL - # ZMQ::MCAST_LOOP + # ZMQ::MCAST_LOOP (version 2 only) # ZMQ::LINGER # ZMQ::RECONNECT_IVL # ZMQ::BACKLOG - # ZMQ::RECOVER_IVL_MSEC + # ZMQ::RECOVER_IVL_MSEC (version 2 only) + # ZMQ::RECONNECT_IVL_MAX (version 3/4 only) + # ZMQ::MAXMSGSIZE (version 3/4 only) + # ZMQ::SNDHWM (version 3/4 only) + # ZMQ::RCVHWM (version 3/4 only) + # ZMQ::MULTICAST_HOPS (version 3/4 only) + # ZMQ::RCVTIMEO (version 3/4 only) + # ZMQ::SNDTIMEO (version 3/4 only) + # ZMQ::RCVLABEL (version 3/4 only) # - # Valid +option_name+ values that take a string +option_value+ are: - # ZMQ::IDENTITY + # Valid +name+ values that take a string +value+ are: + # ZMQ::IDENTITY (version 2/3 only) # ZMQ::SUBSCRIBE # ZMQ::UNSUBSCRIBE # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. + # Returns 0 when the operation completed successfully. + # Returns -1 when this operation failed. # - def setsockopt option_name, option_value, option_len = nil - option_value = sanitize_value option_name, option_value + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # rc = socket.setsockopt(ZMQ::LINGER, 1_000) + # ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed") + # + def setsockopt name, value, length = nil + if long_long_option?(name) + length = 8 + pointer = LibC.malloc length + pointer.write_long_long value - begin - case option_name - when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC - option_len = 8 # all of these options are defined as int64_t or uint64_t - option_value_ptr = LibC.malloc option_len - option_value_ptr.write_long_long option_value + elsif int_option?(name) + length = 4 + pointer = LibC.malloc length + pointer.write_int value - when LINGER, RECONNECT_IVL, BACKLOG - option_len = 4 # hard-code "int" length to 4 bytes - option_value_ptr = LibC.malloc option_len - option_value_ptr.write_int option_value + elsif string_option?(name) + length ||= value.size - when IDENTITY, SUBSCRIBE, UNSUBSCRIBE - option_len ||= option_value.size + # note: not checking errno for failed memory allocations :( + pointer = LibC.malloc length + pointer.write_string value + end - # note: not checking errno for failed memory allocations :( - option_value_ptr = LibC.malloc option_len - option_value_ptr.write_string option_value + rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length + LibC.free(pointer) unless pointer.nil? || pointer.null? + rc + end - else - # we didn't understand the passed option argument - # will force a raise due to EINVAL being non-zero - error_check ZMQ_SETSOCKOPT_STR, EINVAL - end - - result_code = LibZMQ.zmq_setsockopt @socket, option_name, option_value_ptr, option_len - error_check ZMQ_SETSOCKOPT_STR, result_code - ensure - LibC.free option_value_ptr unless option_value_ptr.nil? || option_value_ptr.null? - end + # Convenience method for checking on additional message parts. + # + # Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE. + # + # Warning: if the call to #getsockopt fails, this method will return + # false and swallow the error. + # + # message_parts = [] + # message = Message.new + # rc = socket.recv(message) + # if ZMQ::Util.resultcode_ok?(rc) + # message_parts << message + # while more_parts? + # message = Message.new + # rc = socket.recv(message) + # message_parts.push(message) if resulcode_ok?(rc) + # end + # end + # + def more_parts? + array = [] + rc = getsockopt ZMQ::RCVMORE, array + + Util.resultcode_ok?(rc) ? array.at(0) : false end - # Get the options set on this socket. Returns a value dependent upon - # the +option_name+ requested. + # Binds the socket to an +address+. # - # Valid +option_name+ values and their return types: - # ZMQ::RCVMORE - boolean - # ZMQ::HWM - integer - # ZMQ::SWAP - integer - # ZMQ::AFFINITY - bitmap in an integer - # ZMQ::IDENTITY - string - # ZMQ::RATE - integer - # ZMQ::RECOVERY_IVL - integer - # ZMQ::MCAST_LOOP - boolean - # ZMQ::SNDBUF - integer - # ZMQ::RCVBUF - integer - # ZMQ::FD - fd in an integer - # ZMQ::EVENTS - bitmap integer - # ZMQ::LINGER - integer measured in milliseconds - # ZMQ::RECONNECT_IVL - integer measured in milliseconds - # ZMQ::BACKLOG - integer - # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds + # socket.bind("tcp://127.0.0.1:5555") # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. + def bind address + LibZMQ.zmq_bind @socket, address + end + + # Connects the socket to an +address+. # - def getsockopt option_name - begin - option_value = FFI::MemoryPointer.new :pointer - option_length = FFI::MemoryPointer.new(:size_t) rescue FFI::MemoryPointer.new(:ulong) + # socket.connect("tcp://127.0.0.1:5555") + # + def connect address + rc = LibZMQ.zmq_connect @socket, address + end - unless [ - TYPE, RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY, - SNDBUF, RCVBUF, FD, EVENTS, LINGER, RECONNECT_IVL, BACKLOG, RECOVERY_IVL_MSEC - ].include? option_name - # we didn't understand the passed option argument - # will force a raise - error_check ZMQ_SETSOCKOPT_STR, -1 + # Closes the socket. Any unprocessed messages in queue are sent or dropped + # depending upon the value of the socket option ZMQ::LINGER. + # + # Returns 0 upon success *or* when the socket has already been closed. + # Returns -1 when the operation fails. Check ZMQ.errno for the error code. + # + # rc = socket.close + # puts("Given socket was invalid!") unless 0 == rc + # + def close + if @socket + remove_finalizer + rc = LibZMQ.zmq_close @socket + @socket = nil + release_cache + rc + else + 0 end + end - option_value, option_length = alloc_temp_sockopt_buffers option_name - result_code = LibZMQ.zmq_getsockopt @socket, option_name, option_value, option_length - error_check ZMQ_GETSOCKOPT_STR, result_code - ret = 0 + private - case option_name - when RCVMORE, MCAST_LOOP - # boolean return - ret = option_value.read_long_long != 0 - when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC - ret = option_value.read_long_long - when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS - ret = option_value.read_int - when IDENTITY - ret = option_value.read_string(option_length.read_long_long) + def __getsockopt__ name, array + value, length = sockopt_buffers name + + rc = LibZMQ.zmq_getsockopt @socket, name, value, length + + if Util.resultcode_ok?(rc) + result = if int_option?(name) + value.read_int + elsif long_long_option?(name) + value.read_long_long + elsif string_option?(name) + value.read_string(length.read_int) + end + + array << result end - ret + rc end - end - # Convenience method for checking on additional message parts. - # - # Equivalent to Socket#getsockopt ZMQ::RCVMORE - # - def more_parts? - getsockopt ZMQ::RCVMORE - end + # Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers + # for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often. + # Cannot save the buffer for the IDENTITY. + def sockopt_buffers name + if long_long_option?(name) + # int64_t or uint64_t + unless @sockopt_cache[:int64] + length = FFI::MemoryPointer.new :size_t + length.write_int 8 + @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length] + end + + @sockopt_cache[:int64] - # Convenience method for getting the value of the socket IDENTITY. - # - def identity - getsockopt ZMQ::IDENTITY - end + elsif int_option?(name) + # int, 0mq assumes int is 4-bytes + unless @sockopt_cache[:int32] + length = FFI::MemoryPointer.new :size_t + length.write_int 4 + @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length] + end + + @sockopt_cache[:int32] - # Convenience method for setting the value of the socket IDENTITY. - # - def identity= value - setsockopt ZMQ::IDENTITY, value.to_s - end + elsif string_option?(name) + length = FFI::MemoryPointer.new :size_t + # could be a string of up to 255 bytes + length.write_int 255 + [FFI::MemoryPointer.new(255), length] + + else + # uh oh, someone passed in an unknown option; use a slop buffer + unless @sockopt_cache[:unknown] + length = FFI::MemoryPointer.new :size_t + length.write_int 4 + @sockopt_cache[:unknown] = [FFI::MemoryPointer.new(:int32), length] + end + + @sockopt_cache[:unknown] + end + end - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. - # - def bind address - result_code = LibZMQ.zmq_bind @socket, address - error_check ZMQ_BIND_STR, result_code - end + def supported_option? name + int_option?(name) || long_long_option?(name) || string_option?(name) + end - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. - # - def connect address - result_code = LibZMQ.zmq_connect @socket, address - error_check ZMQ_CONNECT_STR, result_code - end + def int_option? name + EVENTS == name || + LINGER == name || + RECONNECT_IVL == name || + FD == name || + TYPE == name || + BACKLOG == name + end - # Closes the socket. Any unprocessed messages in queue are sent or dropped - # depending upon the value of the socket option ZMQ::LINGER. - # - def close - if @socket - remove_finalizer - result_code = LibZMQ.zmq_close @socket - error_check ZMQ_CLOSE_STR, result_code - @socket = nil - release_cache + def string_option? name + SUBSCRIBE == name || + UNSUBSCRIBE == name end - end - # Queues the message for transmission. Message is assumed to conform to the - # same public API as #Message. - # - # +flags+ may take two values: - # * 0 (default) - blocking operation - # * ZMQ::NOBLOCK - non-blocking operation - # * ZMQ::SNDMORE - this message is part of a multi-part message - # - # Returns true when the message was successfully enqueued. - # Returns false under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # The application code is responsible for handling the +message+ object - # lifecycle when #send returns. - # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. - # - def send message, flags = 0 - begin - result_code = LibZMQ.zmq_send @socket, message.address, flags + def long_long_option? name + RCVMORE == name || + AFFINITY == name + end - # when the flag isn't set, do a normal error check - # when set, check to see if the message was successfully queued - queued = noblock?(flags) ? error_check_nonblock(result_code) : error_check(ZMQ_SEND_STR, result_code) + def unsupported_setsock_option? name + RCVMORE == name end - # true if sent, false if failed/EAGAIN - queued - end + def unsupported_getsock_option? name + UNSUBSCRIBE == name || + SUBSCRIBE == name + end - # Helper method to make a new #Message instance out of the +message_string+ passed - # in for transmission. - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. - # - def send_string message_string, flags = 0 - message = Message.new message_string - result_code = send_and_close message, flags + def release_cache + @sockopt_cache.clear + end + end # module CommonSocketBehavior - result_code - end - # Send a sequence of strings as a multipart message out of the +parts+ - # passed in for transmission. Every element of +parts+ should be - # a String. - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Raises the same exceptions as Socket#send. - # - def send_strings parts, flags = 0 - return false if !parts || parts.empty? + module IdentitySupport - parts[0...-1].each do |part| - return false unless send_string part, flags | ZMQ::SNDMORE + # Convenience method for getting the value of the socket IDENTITY. + # + def identity + array = [] + getsockopt IDENTITY, array + array.at(0) end - send_string parts[-1], flags - end - - # Sends a message. This will automatically close the +message+ for both successful - # and failed sends. - # - # Raises the same exceptions as Socket#send - # - def send_and_close message, flags = 0 - begin - result_code = send message, flags - ensure - message.close + # Convenience method for setting the value of the socket IDENTITY. + # + def identity=(value) + setsockopt IDENTITY, value.to_s end - result_code - end - # Dequeues a message from the underlying queue. By default, this is a blocking operation. - # - # +flags+ may take two values: - # 0 (default) - blocking operation - # ZMQ::NOBLOCK - non-blocking operation - # - # Returns a true when it successfully dequeues one from the queue. Also, the +message+ - # object is populated by the library with a data buffer containing the received - # data. - # - # Returns nil when a message could not be dequeued *and* +flags+ is set - # with ZMQ::NOBLOCK. The +message+ object is not modified in this situation. - # - # The application code is *not* responsible for handling the +message+ object lifecycle - # when #recv raises an exception. The #recv method takes ownership of the - # +message+ and its associated buffers. A failed call will - # release the data buffers assigned to the +message+. - # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. - # - def recv message, flags = 0 - begin - dequeued = _recv message, flags - rescue ZeroMQError - message.close - raise + + private + + def string_option? name + super || + IDENTITY == name end + end # module IdentitySupport - dequeued ? true : nil - end - # Helper method to make a new #Message instance and convert its payload - # to a string. - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Can raise two kinds of exceptions depending on the error. - # ContextError:: Raised when a socket operation is attempted on a terminated - # #Context. See #ContextError. - # SocketError:: See all of the possibilities in the docs for #SocketError. - # - def recv_string flags = 0 - message = @receiver_klass.new + if LibZMQ.version2? - begin - dequeued = _recv message, flags + class Socket + # Inclusion order is *important* since later modules may have a call + # to #super. We want those calls to go up the chain in a particular + # order + include CommonSocketBehavior + include IdentitySupport - if dequeued - message.copy_out_string - else - nil + # Get the options set on this socket. + # + # +name+ determines the socket option to request + # +array+ should be an empty array; a result of the proper type + # (numeric, string, boolean) will be inserted into + # the first position. + # + # Valid +option_name+ values: + # ZMQ::RCVMORE - true or false + # ZMQ::HWM - integer + # ZMQ::SWAP - integer + # ZMQ::AFFINITY - bitmap in an integer + # ZMQ::IDENTITY - string + # ZMQ::RATE - integer + # ZMQ::RECOVERY_IVL - integer + # ZMQ::MCAST_LOOP - true or false + # ZMQ::SNDBUF - integer + # ZMQ::RCVBUF - integer + # ZMQ::FD - fd in an integer + # ZMQ::EVENTS - bitmap integer + # ZMQ::LINGER - integer measured in milliseconds + # ZMQ::RECONNECT_IVL - integer measured in milliseconds + # ZMQ::BACKLOG - integer + # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds + # + # Returns 0 when the operation completed successfully. + # Returns -1 when this operation failed. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # # retrieve high water mark + # array = [] + # rc = socket.getsockopt(ZMQ::HWM, array) + # hwm = array.first if ZMQ::Util.resultcode_ok?(rc) + # + def getsockopt name, array + rc = __getsockopt__ name, array + + if Util.resultcode_ok?(rc) && (RCVMORE == name || MCAST_LOOP == name) + # convert to boolean + array[0] = 1 == array[0] + end + + rc end - ensure - message.close - end - end - # Receive a multipart message as a list of strings. - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Raises the same exceptions as Socket#recv. - # - def recv_strings flags = 0 - parts = [] - parts << recv_string(flags) - parts << recv_string(flags) while more_parts? - parts - end + # Queues the message for transmission. Message is assumed to conform to the + # same public API as #Message. + # + # +flags+ may take two values: + # * 0 (default) - blocking operation + # * ZMQ::NOBLOCK - non-blocking operation + # * ZMQ::SNDMORE - this message is part of a multi-part message + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # The application code is responsible for handling the +message+ object + # lifecycle when #send returns. Regardless of the return code, the user + # is responsible for calling message.close to free the memory in use. + # + def send message, flags = 0 + LibZMQ.zmq_send @socket, message.address, flags + end - private - - def noblock? flag - (NOBLOCK & flag) == NOBLOCK - end + # Helper method to make a new #Message instance out of the +message_string+ passed + # in for transmission. + # + # +flags+ may be ZMQ::NOBLOCK. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_string message_string, flags = 0 + message = Message.new message_string + send_and_close message, flags + end - def _recv message, flags = 0 - result_code = LibZMQ.zmq_recv @socket, message.address, flags + # Send a sequence of strings as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a String. + # + # +flags+ may be ZMQ::NOBLOCK. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_strings parts, flags = 0 + return -1 if !parts || parts.empty? - if noblock?(flags) - error_check_nonblock(result_code) - else - error_check(ZMQ_RECV_STR, result_code) - end - end + parts[0..-2].each do |part| + rc = send_string part, flags | ZMQ::SNDMORE + return rc unless Util.resultcode_ok?(rc) + end - # Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers - # for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often. - # Cannot save the buffer for the IDENTITY. - def alloc_temp_sockopt_buffers option_name - case option_name - when RCVMORE, MCAST_LOOP, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC - # int64_t - unless @sockopt_cache[:int64] - length = FFI::MemoryPointer.new :int64 - length.write_long_long 8 - @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length] + send_string parts[-1], flags end - @sockopt_cache[:int64] - when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS - # int, 0mq assumes int is 4-bytes - unless @sockopt_cache[:int32] - length = FFI::MemoryPointer.new :int32 - length.write_int 4 - @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length] + # Send a sequence of messages as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a Message (or subclass). + # + # +flags+ may be ZMQ::NOBLOCK. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def sendmsgs parts, flags = 0 + return -1 if !parts || parts.empty? + + parts[0..-2].each do |part| + rc = send part, flags | ZMQ::SNDMORE + return rc unless Util.resultcode_ok?(rc) + end + + send parts[-1], flags end - @sockopt_cache[:int32] - when IDENTITY - length = FFI::MemoryPointer.new :int64 - # could be a string of up to 255 bytes - length.write_long_long 255 - [FFI::MemoryPointer.new(255), length] - end - end + # Sends a message. This will automatically close the +message+ for both successful + # and failed sends. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_and_close message, flags = 0 + rc = send message, flags + message.close + rc + end - def sanitize_value option_name, option_value - case option_name - when HWM, AFFINITY, SNDBUF, RCVBUF - option_value.abs - when MCAST_LOOP - option_value ? 1 : 0 - else - option_value - end - end - - def release_cache - @sockopt_cache.clear - end + # Dequeues a message from the underlying queue. By default, this is a blocking operation. + # + # +flags+ may take two values: + # 0 (default) - blocking operation + # ZMQ::NOBLOCK - non-blocking operation + # + # Returns 0 when the message was successfully dequeued. + # Returns -1 under two conditions. + # 1. The message could not be dequeued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # The application code is responsible for handling the +message+ object lifecycle + # when #recv returns an error code. + # + def recv message, flags = 0 + LibZMQ.zmq_recv @socket, message.address, flags + end - # require a minimum of 0mq 2.1.0 to support socket finalizers; it contains important - # fixes for sockets and threads so that a garbage collector thread can successfully - # reap this resource without crashing - if Util.minimum_api?([2, 1, 0]) - def define_finalizer - ObjectSpace.define_finalizer(self, self.class.close(@socket)) - end - else - def define_finalizer - # no op - end - end + # Converts the received message to a string and replaces the +string+ arg + # contents. + # + # +string+ should be an empty string, .e.g. '' + # +flags+ may be ZMQ::NOBLOCK. + # + # Returns 0 when the message was successfully dequeued. + # Returns -1 under two conditions. + # 1. The message could not be dequeued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def recv_string string, flags = 0 + message = @receiver_klass.new + rc = recv message, flags + string.replace(message.copy_out_string) if Util.resultcode_ok?(rc) + message.close + rc + end - def remove_finalizer - ObjectSpace.undefine_finalizer self - end + # Receive a multipart message as a list of strings. + # + # +list+ should be an object that responds to #append or #<< so received + # strings can be appended to it + # +flag+ may be ZMQ::NOBLOCK. Any other flag will be removed + # + # Returns 0 when all messages were successfully dequeued. + # Returns -1 under two conditions. + # 1. A message could not be dequeued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. Also, the +list+ will not be modified when there was an error. + # + def recv_strings list, flag = 0 + array = [] + rc = recvmsgs array, flag + + if Util.resultcode_ok?(rc) + array.each do |message| + list << message.copy_out_string + message.close + end + end + + rc + end - def self.close socket - Proc.new { LibZMQ.zmq_close socket } - end -end # class Socket + # Receive a multipart message as an array of objects + # (by default these are instances of Message). + # + # +list+ should be an object that responds to #append or #<< so received + # messages can be appended to it + # +flag+ may be ZMQ::NOBLOCK. Any other flag will be + # removed. + # + # Returns 0 when all messages were successfully dequeued. + # Returns -1 under two conditions. + # 1. A message could not be dequeued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. Also, the +list+ will not be modified when there was an error. + # + def recvmsgs list, flag = 0 + flag = NOBLOCK if noblock?(flag) + + parts = [] + message = @receiver_klass.new + rc = recv message, flag + parts << message + + # check rc *first*; necessary because the call to #more_parts? can reset + # the zmq_errno to a weird value, so the zmq_errno that was set on the + # call to #recv gets lost + while Util.resultcode_ok?(rc) && more_parts? + message = @receiver_klass.new + rc = recv message, flag + parts << message + end + + # only append the received parts if there were no errors + # FIXME: + # need to detect EAGAIN if flag is set; EAGAIN means we have read all that we + # can and should return whatever was already read; need a spec! + if Util.resultcode_ok?(rc) + parts.each { |part| list << part } + end + + rc + end + + + private + + def noblock? flags + (NOBLOCK & flags) == NOBLOCK + end + + def int_option? name + super || + RECONNECT_IVL_MAX == name + end + + def long_long_option? name + super || + HWM == name || + SWAP == name || + RATE == name || + RECOVERY_IVL == name || + RECOVERY_IVL_MSEC == name || + MCAST_LOOP == name || + SNDBUF == name || + RCVBUF == name + end + + # these finalizer-related methods cannot live in the CommonSocketBehavior + # module; they *must* be in the class definition directly + + def define_finalizer + ObjectSpace.define_finalizer(self, self.class.close(@socket)) + end + + def remove_finalizer + ObjectSpace.undefine_finalizer self + end + + def self.close socket + Proc.new { LibZMQ.zmq_close socket } + end + end # class Socket for version2 + + end # LibZMQ.version2? + + + if LibZMQ.version3? || LibZMQ.version4? + class Socket + include CommonSocketBehavior + include IdentitySupport + + # Get the options set on this socket. + # + # +name+ determines the socket option to request + # +array+ should be an empty array; a result of the proper type + # (numeric, string, boolean) will be inserted into + # the first position. + # + # Valid +option_name+ values: + # ZMQ::RCVMORE - true or false + # ZMQ::HWM - integer + # ZMQ::SWAP - integer + # ZMQ::AFFINITY - bitmap in an integer + # ZMQ::IDENTITY - string + # ZMQ::RATE - integer + # ZMQ::RECOVERY_IVL - integer + # ZMQ::SNDBUF - integer + # ZMQ::RCVBUF - integer + # ZMQ::FD - fd in an integer + # ZMQ::EVENTS - bitmap integer + # ZMQ::LINGER - integer measured in milliseconds + # ZMQ::RECONNECT_IVL - integer measured in milliseconds + # ZMQ::BACKLOG - integer + # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds + # + # Returns 0 when the operation completed successfully. + # Returns -1 when this operation failed. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # # retrieve high water mark + # array = [] + # rc = socket.getsockopt(ZMQ::HWM, array) + # hwm = array.first if ZMQ::Util.resultcode_ok?(rc) + # + def getsockopt name, array + rc = __getsockopt__ name, array + + if Util.resultcode_ok?(rc) && (RCVMORE == name) + # convert to boolean + array[0] = 1 == array[0] + end + + rc + end + + # The last message part received is tested to see if it is a label. + # + # Equivalent to calling Socket#getsockopt with ZMQ::RCVLABEL. + # + # Warning: if the call to #getsockopt fails, this method will return + # false and swallow the error. + # + # labels = [] + # message_parts = [] + # message = Message.new + # rc = socket.recv(message) + # if ZMQ::Util.resultcode_ok?(rc) + # label? ? labels.push(message) : message_parts.push(message) + # while more_parts? + # message = Message.new + # if ZMQ::Util.resulcode_ok?(socket.recv(message)) + # label? ? labels.push(message) : message_parts.push(message) + # end + # end + # end + # + def label? + array = [] + rc = getsockopt ZMQ::RCVLABEL, array + + Util.resultcode_ok?(rc) ? array.at(0) : false + end + + # Queues the message for transmission. Message is assumed to conform to the + # same public API as #Message. + # + # +flags+ may take two values: + # * 0 (default) - blocking operation + # * ZMQ::DONTWAIT - non-blocking operation + # * ZMQ::SNDMORE - this message is part of a multi-part message + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def sendmsg message, flags = 0 + LibZMQ.zmq_sendmsg @socket, message.address, flags + end + + # Helper method to make a new #Message instance out of the +string+ passed + # in for transmission. + # + # +flags+ may be ZMQ::DONTWAIT and ZMQ::SNDMORE. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_string string, flags = 0 + message = Message.new string + send_and_close message, flags + end + + # Send a sequence of strings as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a String. + # + # +flags+ may be ZMQ::DONTWAIT. + # + # Returns 0 when the messages were successfully enqueued. + # Returns -1 under two conditions. + # 1. A message could not be enqueued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_strings parts, flags = 0 + return -1 if !parts || parts.empty? + flags = DONTWAIT if dontwait?(flags) + + parts[0..-2].each do |part| + rc = send_string part, (flags | ZMQ::SNDMORE) + return rc unless Util.resultcode_ok?(rc) + end + + send_string parts[-1], flags + end + + # Send a sequence of messages as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a Message (or subclass). + # + # +flags+ may be ZMQ::DONTWAIT. + # + # Returns 0 when the messages were successfully enqueued. + # Returns -1 under two conditions. + # 1. A message could not be enqueued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def sendmsgs parts, flags = 0 + return -1 if !parts || parts.empty? + flags = DONTWAIT if dontwait?(flags) + + parts[0..-2].each do |part| + rc = sendmsg part, (flags | ZMQ::SNDMORE) + return rc unless Util.resultcode_ok?(rc) + end + + sendmsg parts[-1], flags + end + + # Sends a message. This will automatically close the +message+ for both successful + # and failed sends. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_and_close message, flags = 0 + rc = sendmsg message, flags + message.close + rc + end + + # Dequeues a message from the underlying queue. By default, this is a blocking operation. + # + # +flags+ may take two values: + # 0 (default) - blocking operation + # ZMQ::DONTWAIT - non-blocking operation + # + # Returns 0 when the message was successfully dequeued. + # Returns -1 under two conditions. + # 1. The message could not be dequeued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # The application code is responsible for handling the +message+ object lifecycle + # when #recv returns an error code. + # + def recvmsg message, flags = 0 + LibZMQ.zmq_recvmsg @socket, message.address, flags + end + + # Helper method to make a new #Message instance and convert its payload + # to a string. + # + # +flags+ may be ZMQ::DONTWAIT. + # + # Returns 0 when the message was successfully dequeued. + # Returns -1 under two conditions. + # 1. The message could not be dequeued + # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # The application code is responsible for handling the +message+ object lifecycle + # when #recv returns an error code. + # + def recv_string string, flags = 0 + message = @receiver_klass.new + rc = recvmsg message, flags + string.replace(message.copy_out_string) if Util.resultcode_ok?(rc) + message.close + rc + end + + # Receive a multipart message as a list of strings. + # + # +flag+ may be ZMQ::DONTWAIT. Any other flag will be + # removed. + # + def recv_strings list, flag = 0 + array = [] + rc = recvmsgs array, flag + + if Util.resultcode_ok?(rc) + array.each do |message| + list << message.copy_out_string + message.close + end + end + + rc + end + + # Receive a multipart message as an array of objects + # (by default these are instances of Message). + # + # +flag+ may be ZMQ::DONTWAIT. Any other flag will be + # removed. + # + # Raises the same exceptions as Socket#recv. + # + def recvmsgs list, flag = 0 + flag = DONTWAIT if dontwait?(flag) + + parts = [] + message = @receiver_klass.new + rc = recvmsg message, flag + parts << message + + # check rc *first*; necessary because the call to #more_parts? can reset + # the zmq_errno to a weird value, so the zmq_errno that was set on the + # call to #recv gets lost + while Util.resultcode_ok?(rc) && more_parts? + message = @receiver_klass.new + rc = recvmsg message, flag + parts << message + end + + # only append the received parts if there were no errors + if Util.resultcode_ok?(rc) + parts.each { |part| list << part } + end + + rc + end + + + private + + def dontwait? flags + (DONTWAIT & flags) == DONTWAIT + end + alias :noblock? :dontwait? + + def int_option? name + super || + RCVLABEL == name || + RECONNECT_IVL_MAX == name || + RCVHWM == name || + SNDHWM == name || + RATE == name || + RECOVERY_IVL == name || + SNDBUF == name || + RCVBUF == name + end + + # these finalizer-related methods cannot live in the CommonSocketBehavior + # module; they *must* be in the class definition directly + + def define_finalizer + ObjectSpace.define_finalizer(self, self.class.close(@socket)) + end + + def remove_finalizer + ObjectSpace.undefine_finalizer self + end + + def self.close socket + Proc.new { LibZMQ.zmq_close socket } + end + end # Socket for version3 + end # LibZMQ.version3? end # module ZMQ