lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.3 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.6

- old
+ new

@@ -1,10 +1,9 @@ module ZMQ module CommonSocketBehavior - include ZMQ::Util attr_reader :socket, :name # Allocates a socket of type +type+ for sending and receiving data. # @@ -80,10 +79,11 @@ end else raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null" end + @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer @@ -155,16 +155,16 @@ # Warning: if the call to #getsockopt fails, this method will return # false and swallow the error. # # message_parts = [] # message = Message.new - # rc = socket.recv(message) + # rc = socket.recvmsg(message) # if ZMQ::Util.resultcode_ok?(rc) # message_parts << message # while more_parts? # message = Message.new - # rc = socket.recv(message) + # rc = socket.recvmsg(message) # message_parts.push(message) if resulcode_ok?(rc) # end # end # def more_parts? @@ -181,11 +181,11 @@ LibZMQ.zmq_bind @socket, address end # Connects the socket to an +address+. # - # socket.connect("tcp://127.0.0.1:5555") + # rc = socket.connect("tcp://127.0.0.1:5555") # def connect address rc = LibZMQ.zmq_connect @socket, address end @@ -208,18 +208,250 @@ else 0 end end + # Queues the message for transmission. Message is assumed to conform to the + # same public API as #Message. + # + # +flags+ may take two values: + # * 0 (default) - blocking operation + # * ZMQ::NonBlocking - non-blocking operation + # * ZMQ::SNDMORE - this message is part of a multi-part message + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def sendmsg message, flags = 0 + __sendmsg__(@socket, message.address, flags) + end + # Helper method to make a new #Message instance out of the +string+ passed + # in for transmission. + # + # +flags+ may be ZMQ::NonBlocking and ZMQ::SNDMORE. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_string string, flags = 0 + message = Message.new string + send_and_close message, flags + end + + # Send a sequence of strings as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a String. + # + # +flags+ may be ZMQ::NonBlocking. + # + # Returns 0 when the messages were successfully enqueued. + # Returns -1 under two conditions. + # 1. A message could not be enqueued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_strings parts, flags = 0 + return -1 if !parts || parts.empty? + flags = NonBlocking if dontwait?(flags) + + parts[0..-2].each do |part| + rc = send_string part, (flags | ZMQ::SNDMORE) + return rc unless Util.resultcode_ok?(rc) + end + + send_string parts[-1], flags + end + + # Send a sequence of messages as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a Message (or subclass). + # + # +flags+ may be ZMQ::NonBlocking. + # + # Returns 0 when the messages were successfully enqueued. + # Returns -1 under two conditions. + # 1. A message could not be enqueued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def sendmsgs parts, flags = 0 + return -1 if !parts || parts.empty? + flags = NonBlocking if dontwait?(flags) + + parts[0..-2].each do |part| + rc = sendmsg part, (flags | ZMQ::SNDMORE) + return rc unless Util.resultcode_ok?(rc) + end + + sendmsg parts[-1], flags + end + + # Sends a message. This will automatically close the +message+ for both successful + # and failed sends. + # + # Returns 0 when the message was successfully enqueued. + # Returns -1 under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + def send_and_close message, flags = 0 + rc = sendmsg message, flags + message.close + rc + end + + # Dequeues a message from the underlying queue. By default, this is a blocking operation. + # + # +flags+ may take two values: + # 0 (default) - blocking operation + # ZMQ::NonBlocking - non-blocking operation + # + # Returns 0 when the message was successfully dequeued. + # Returns -1 under two conditions. + # 1. The message could not be dequeued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # The application code is responsible for handling the +message+ object lifecycle + # when #recv returns an error code. + # + def recvmsg message, flags = 0 + #LibZMQ.zmq_recvmsg @socket, message.address, flags + __recvmsg__(@socket, message.address, flags) + end + + # Helper method to make a new #Message instance and convert its payload + # to a string. + # + # +flags+ may be ZMQ::NonBlocking. + # + # Returns 0 when the message was successfully dequeued. + # Returns -1 under two conditions. + # 1. The message could not be dequeued + # 2. When +flags+ is set with ZMQ::NonBlocking and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. + # + # The application code is responsible for handling the +message+ object lifecycle + # when #recv returns an error code. + # + def recv_string string, flags = 0 + message = @receiver_klass.new + rc = recvmsg message, flags + string.replace(message.copy_out_string) if Util.resultcode_ok?(rc) + message.close + rc + end + + # Receive a multipart message as a list of strings. + # + # +flag+ may be ZMQ::NonBlocking. Any other flag will be + # removed. + # + def recv_strings list, flag = 0 + array = [] + rc = recvmsgs array, flag + + if Util.resultcode_ok?(rc) + array.each do |message| + list << message.copy_out_string + message.close + end + end + + rc + end + + # Receive a multipart message as an array of objects + # (by default these are instances of Message). + # + # +flag+ may be ZMQ::NonBlocking. Any other flag will be + # removed. + # + def recvmsgs list, flag = 0 + flag = NonBlocking if dontwait?(flag) + + message = @receiver_klass.new + rc = recvmsg message, flag + + if Util.resultcode_ok?(rc) + list << message + + # check rc *first*; necessary because the call to #more_parts? can reset + # the zmq_errno to a weird value, so the zmq_errno that was set on the + # call to #recv gets lost + while Util.resultcode_ok?(rc) && more_parts? + message = @receiver_klass.new + rc = recvmsg message, flag + + if Util.resultcode_ok?(rc) + list << message + else + message.close + list.each { |msg| msg.close } + list.clear + end + end + else + message.close + end + + rc + end + + # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes + # a +list+ for receiving the message body parts and a +routing_envelope+ + # for receiving the message parts comprising the 0mq routing information. + # + def recv_multipart list, routing_envelope, flag = 0 + parts = [] + rc = recvmsgs parts, flag + + if Util.resultcode_ok?(rc) + routing = true + parts.each do |part| + if routing + routing_envelope << part + routing = part.size > 0 + else + list << part + end + end + end + + rc + end + + private def __getsockopt__ name, array # a small optimization so we only have to determine the option # type a single time; gives approx 5% speedup to do it this way. option_type = @option_lookup[name] - + value, length = sockopt_buffers option_type rc = LibZMQ.zmq_getsockopt @socket, name, value, length if Util.resultcode_ok?(rc) @@ -274,26 +506,31 @@ end @int_cache end end - + def populate_option_lookup # integer options - [EVENTS, LINGER, RECONNECT_IVL, FD, TYPE, BACKLOG].each { |option| @option_lookup[option] = 0 } + [EVENTS, LINGER, RCVTIMEO, SNDTIMEO, RECONNECT_IVL, FD, TYPE, BACKLOG].each { |option| @option_lookup[option] = 0 } # long long options [RCVMORE, AFFINITY].each { |option| @option_lookup[option] = 1 } - + # string options [SUBSCRIBE, UNSUBSCRIBE].each { |option| @option_lookup[option] = 2 } end def release_cache @longlong_cache = nil @int_cache = nil end + + def dontwait?(flags) + (NonBlocking & flags) == NonBlocking + end + alias :noblock? :dontwait? end # module CommonSocketBehavior module IdentitySupport @@ -311,18 +548,18 @@ setsockopt IDENTITY, value.to_s end private - + def populate_option_lookup super() - + # string options [IDENTITY].each { |option| @option_lookup[option] = 2 } end - + end # module IdentitySupport if LibZMQ.version2? @@ -378,285 +615,31 @@ end rc end - # Queues the message for transmission. Message is assumed to conform to the - # same public API as #Message. - # - # +flags+ may take two values: - # * 0 (default) - blocking operation - # * ZMQ::NOBLOCK - non-blocking operation - # * ZMQ::SNDMORE - this message is part of a multi-part message - # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - # The application code is responsible for handling the +message+ object - # lifecycle when #send returns. Regardless of the return code, the user - # is responsible for calling message.close to free the memory in use. - # - def send message, flags = 0 - LibZMQ.zmq_send @socket, message.address, flags - end - # Helper method to make a new #Message instance out of the +message_string+ passed - # in for transmission. - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def send_string message_string, flags = 0 - message = Message.new message_string - send_and_close message, flags - end + private - # Send a sequence of strings as a multipart message out of the +parts+ - # passed in for transmission. Every element of +parts+ should be - # a String. - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def send_strings parts, flags = 0 - return -1 if !parts || parts.empty? - - parts[0..-2].each do |part| - rc = send_string part, flags | ZMQ::SNDMORE - return rc unless Util.resultcode_ok?(rc) - end - - send_string parts[-1], flags + def __sendmsg__(socket, address, flags) + LibZMQ.zmq_send(socket, address, flags) end - # Send a sequence of messages as a multipart message out of the +parts+ - # passed in for transmission. Every element of +parts+ should be - # a Message (or subclass). - # - # +flags+ may be ZMQ::NOBLOCK. - # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def sendmsgs parts, flags = 0 - return -1 if !parts || parts.empty? - - parts[0..-2].each do |part| - rc = send part, flags | ZMQ::SNDMORE - return rc unless Util.resultcode_ok?(rc) - end - - send parts[-1], flags + def __recvmsg__(socket, address, flags) + LibZMQ.zmq_recv(socket, address, flags) end - # Sends a message. This will automatically close the +message+ for both successful - # and failed sends. - # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def send_and_close message, flags = 0 - rc = send message, flags - message.close - rc - end + def populate_option_lookup + super() - # Dequeues a message from the underlying queue. By default, this is a blocking operation. - # - # +flags+ may take two values: - # 0 (default) - blocking operation - # ZMQ::NOBLOCK - non-blocking operation - # - # Returns 0 when the message was successfully dequeued. - # Returns -1 under two conditions. - # 1. The message could not be dequeued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - # The application code is responsible for handling the +message+ object lifecycle - # when #recv returns an error code. - # - def recv message, flags = 0 - LibZMQ.zmq_recv @socket, message.address, flags - end + # integer options + [RECONNECT_IVL_MAX].each { |option| @option_lookup[option] = 0 } - # Converts the received message to a string and replaces the +string+ arg - # contents. - # - # +string+ should be an empty string, .e.g. '' - # +flags+ may be ZMQ::NOBLOCK. - # - # Returns 0 when the message was successfully dequeued. - # Returns -1 under two conditions. - # 1. The message could not be dequeued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def recv_string string, flags = 0 - message = @receiver_klass.new - rc = recv message, flags - string.replace(message.copy_out_string) if Util.resultcode_ok?(rc) - message.close - rc + # long long options + [HWM, SWAP, RATE, RECOVERY_IVL, RECOVERY_IVL_MSEC, MCAST_LOOP, SNDBUF, RCVBUF].each { |option| @option_lookup[option] = 1 } end - # Receive a multipart message as a list of strings. - # - # +list+ should be an object that responds to #append or #<< so received - # strings can be appended to it - # +flag+ may be ZMQ::NOBLOCK. Any other flag will be removed - # - # Returns 0 when all messages were successfully dequeued. - # Returns -1 under two conditions. - # 1. A message could not be dequeued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. Also, the +list+ will not be modified when there was an error. - # - def recv_strings list, flag = 0 - array = [] - rc = recvmsgs array, flag - - if Util.resultcode_ok?(rc) - array.each do |message| - list << message.copy_out_string - message.close - end - end - - rc - end - - # Receive a multipart message as an array of objects - # (by default these are instances of Message). - # - # +list+ should be an object that responds to #append or #<< so received - # messages can be appended to it - # +flag+ may be ZMQ::NOBLOCK. Any other flag will be - # removed. - # - # Returns 0 when all messages were successfully dequeued. - # Returns -1 under two conditions. - # 1. A message could not be dequeued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. Also, the +list+ will not be modified when there was an error. - # - def recvmsgs list, flag = 0 - flag = NOBLOCK if noblock?(flag) - - message = @receiver_klass.new - rc = recv message, flag - - if Util.resultcode_ok?(rc) - list << message - - # check rc *first*; necessary because the call to #more_parts? can reset - # the zmq_errno to a weird value, so the zmq_errno that was set on the - # call to #recv gets lost - while Util.resultcode_ok?(rc) && more_parts? - message = @receiver_klass.new - rc = recv message, flag - - if Util.resultcode_ok?(rc) - list << message - else - message.close - list.each { |msg| msg.close } - list.clear - end - end - else - message.close - end - - rc - end - - # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes - # a +list+ for receiving the message body parts and a +routing_envelope+ - # for receiving the message parts comprising the 0mq routing information. - # - # Returns 0 when all messages were successfully dequeued. - # Returns -1 under two conditions. - # 1. A message could not be dequeued - # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. Also, the +list+ *may* be modified when there was an error. - # - def recv_multipart list, routing_envelope, flag = 0 - parts = [] - rc = recvmsgs parts, flag - - if Util.resultcode_ok?(rc) - routing = true - parts.each do |part| - if routing - routing_envelope << part - routing = part.size > 0 - else - list << part - end - end - end - - rc - end - - - private - - def noblock? flags - (NOBLOCK & flags) == NOBLOCK - end - - def populate_option_lookup - super() - - # integer options - [RECONNECT_IVL_MAX].each { |option| @option_lookup[option] = 0 } - - # long long options - [HWM, SWAP, RATE, RECOVERY_IVL, RECOVERY_IVL_MSEC, MCAST_LOOP, SNDBUF, RCVBUF].each { |option| @option_lookup[option] = 1 } - end - # these finalizer-related methods cannot live in the CommonSocketBehavior # module; they *must* be in the class definition directly def define_finalizer ObjectSpace.define_finalizer(self, self.class.close(@socket)) @@ -700,10 +683,11 @@ # ZMQ::EVENTS - bitmap integer # ZMQ::LINGER - integer measured in milliseconds # ZMQ::RECONNECT_IVL - integer measured in milliseconds # ZMQ::BACKLOG - integer # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds + # ZMQ::IPV4ONLY - integer # # Returns 0 when the operation completed successfully. # Returns -1 when this operation failed. # # With a -1 return code, the user must check ZMQ.errno to determine the @@ -722,266 +706,48 @@ array[0] = 1 == array[0] end rc end - - # Queues the message for transmission. Message is assumed to conform to the - # same public API as #Message. + + # Version3 only # - # +flags+ may take two values: - # * 0 (default) - blocking operation - # * ZMQ::DONTWAIT - non-blocking operation - # * ZMQ::SNDMORE - this message is part of a multi-part message + # Disconnect the socket from the given +endpoint+. # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def sendmsg message, flags = 0 - LibZMQ.zmq_sendmsg @socket, message.address, flags + def disconnect(endpoint) + LibZMQ.zmq_disconnect(endpoint) end - - # Helper method to make a new #Message instance out of the +string+ passed - # in for transmission. + + # Version3 only # - # +flags+ may be ZMQ::DONTWAIT and ZMQ::SNDMORE. + # Unbind the socket from the given +endpoint+. # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def send_string string, flags = 0 - message = Message.new string - send_and_close message, flags + def unbind(endpoint) + LibZMQ.zmq_unbind(endpoint) end - # Send a sequence of strings as a multipart message out of the +parts+ - # passed in for transmission. Every element of +parts+ should be - # a String. - # - # +flags+ may be ZMQ::DONTWAIT. - # - # Returns 0 when the messages were successfully enqueued. - # Returns -1 under two conditions. - # 1. A message could not be enqueued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def send_strings parts, flags = 0 - return -1 if !parts || parts.empty? - flags = DONTWAIT if dontwait?(flags) - parts[0..-2].each do |part| - rc = send_string part, (flags | ZMQ::SNDMORE) - return rc unless Util.resultcode_ok?(rc) - end + private - send_string parts[-1], flags + def __sendmsg__(socket, address, flags) + LibZMQ.zmq_sendmsg(socket, address, flags) end - # Send a sequence of messages as a multipart message out of the +parts+ - # passed in for transmission. Every element of +parts+ should be - # a Message (or subclass). - # - # +flags+ may be ZMQ::DONTWAIT. - # - # Returns 0 when the messages were successfully enqueued. - # Returns -1 under two conditions. - # 1. A message could not be enqueued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def sendmsgs parts, flags = 0 - return -1 if !parts || parts.empty? - flags = DONTWAIT if dontwait?(flags) - - parts[0..-2].each do |part| - rc = sendmsg part, (flags | ZMQ::SNDMORE) - return rc unless Util.resultcode_ok?(rc) - end - - sendmsg parts[-1], flags + def __recvmsg__(socket, address, flags) + LibZMQ.zmq_recvmsg(socket, address, flags) end - # Sends a message. This will automatically close the +message+ for both successful - # and failed sends. - # - # Returns 0 when the message was successfully enqueued. - # Returns -1 under two conditions. - # 1. The message could not be enqueued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - def send_and_close message, flags = 0 - rc = sendmsg message, flags - message.close - rc - end + def populate_option_lookup + super() - # Dequeues a message from the underlying queue. By default, this is a blocking operation. - # - # +flags+ may take two values: - # 0 (default) - blocking operation - # ZMQ::DONTWAIT - non-blocking operation - # - # Returns 0 when the message was successfully dequeued. - # Returns -1 under two conditions. - # 1. The message could not be dequeued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - # The application code is responsible for handling the +message+ object lifecycle - # when #recv returns an error code. - # - def recvmsg message, flags = 0 - LibZMQ.zmq_recvmsg @socket, message.address, flags + # integer options + [RECONNECT_IVL_MAX, RCVHWM, SNDHWM, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, IPV4ONLY, + ROUTER_BEHAVIOR, TCP_KEEPALIVE, TCP_KEEPALIVE_CNT, + TCP_KEEPALIVE_IDLE, TCP_KEEPALIVE_INTVL, TCP_ACCEPT_FILTER].each { |option| @option_lookup[option] = 0 } + + # long long options + [MAXMSGSIZE].each { |option| @option_lookup[option] = 1 } end - - # Helper method to make a new #Message instance and convert its payload - # to a string. - # - # +flags+ may be ZMQ::DONTWAIT. - # - # Returns 0 when the message was successfully dequeued. - # Returns -1 under two conditions. - # 1. The message could not be dequeued - # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN. - # - # With a -1 return code, the user must check ZMQ.errno to determine the - # cause. - # - # The application code is responsible for handling the +message+ object lifecycle - # when #recv returns an error code. - # - def recv_string string, flags = 0 - message = @receiver_klass.new - rc = recvmsg message, flags - string.replace(message.copy_out_string) if Util.resultcode_ok?(rc) - message.close - rc - end - - # Receive a multipart message as a list of strings. - # - # +flag+ may be ZMQ::DONTWAIT. Any other flag will be - # removed. - # - def recv_strings list, flag = 0 - array = [] - rc = recvmsgs array, flag - - if Util.resultcode_ok?(rc) - array.each do |message| - list << message.copy_out_string - message.close - end - end - - rc - end - - # Receive a multipart message as an array of objects - # (by default these are instances of Message). - # - # +flag+ may be ZMQ::DONTWAIT. Any other flag will be - # removed. - # - def recvmsgs list, flag = 0 - flag = DONTWAIT if dontwait?(flag) - - message = @receiver_klass.new - rc = recvmsg message, flag - - if Util.resultcode_ok?(rc) - list << message - - # check rc *first*; necessary because the call to #more_parts? can reset - # the zmq_errno to a weird value, so the zmq_errno that was set on the - # call to #recv gets lost - while Util.resultcode_ok?(rc) && more_parts? - message = @receiver_klass.new - rc = recvmsg message, flag - - if Util.resultcode_ok?(rc) - list << message - else - message.close - list.each { |msg| msg.close } - list.clear - end - end - else - message.close - end - - rc - end - - # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes - # a +list+ for receiving the message body parts and a +routing_envelope+ - # for receiving the message parts comprising the 0mq routing information. - # - def recv_multipart list, routing_envelope, flag = 0 - parts = [] - rc = recvmsgs parts, flag - - if Util.resultcode_ok?(rc) - routing = true - parts.each do |part| - if routing - routing_envelope << part - routing = part.size > 0 - else - list << part - end - end - end - - rc - end - - - private - - def dontwait? flags - (DONTWAIT & flags) == DONTWAIT - end - alias :noblock? :dontwait? - - def int_option? name - super(name) || - RECONNECT_IVL_MAX == name || - RCVHWM == name || - SNDHWM == name || - RATE == name || - RECOVERY_IVL == name || - SNDBUF == name || - RCVBUF == name - end - - def populate_option_lookup - super() - - # integer options - [RECONNECT_IVL_MAX, RCVHWM, SNDHWM, RATE, RECOVERY_IVL, SNDBUF, RCVBUF].each { |option| @option_lookup[option] = 0 } - end # these finalizer-related methods cannot live in the CommonSocketBehavior # module; they *must* be in the class definition directly def define_finalizer