lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.0 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.2

- old
+ new

@@ -32,11 +32,11 @@ # end # def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message} new(context_ptr, type, opts) rescue nil end - + # To avoid rescuing exceptions, use the factory method #create for # all socket creation. # # Allocates a socket of type +type+ for sending and receiving data. # @@ -66,11 +66,11 @@ # def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message} # users may override the classes used for receiving; class must conform to the # same public API as ZMQ::Message @receiver_klass = opts[:receiver_class] - + context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context) unless context_ptr.null? @socket = LibZMQ.zmq_socket context_ptr, type if @socket && !@socket.null? @@ -98,18 +98,17 @@ # ZMQ::MCAST_LOOP (version 2 only) # ZMQ::LINGER # ZMQ::RECONNECT_IVL # ZMQ::BACKLOG # ZMQ::RECOVER_IVL_MSEC (version 2 only) - # ZMQ::RECONNECT_IVL_MAX (version 3/4 only) - # ZMQ::MAXMSGSIZE (version 3/4 only) - # ZMQ::SNDHWM (version 3/4 only) - # ZMQ::RCVHWM (version 3/4 only) - # ZMQ::MULTICAST_HOPS (version 3/4 only) - # ZMQ::RCVTIMEO (version 3/4 only) - # ZMQ::SNDTIMEO (version 3/4 only) - # ZMQ::RCVLABEL (version 3/4 only) + # ZMQ::RECONNECT_IVL_MAX (version 3 only) + # ZMQ::MAXMSGSIZE (version 3 only) + # ZMQ::SNDHWM (version 3 only) + # ZMQ::RCVHWM (version 3 only) + # ZMQ::MULTICAST_HOPS (version 3 only) + # ZMQ::RCVTIMEO (version 3 only) + # ZMQ::SNDTIMEO (version 3 only) # # Valid +name+ values that take a string +value+ are: # ZMQ::IDENTITY (version 2/3 only) # ZMQ::SUBSCRIBE # ZMQ::UNSUBSCRIBE @@ -167,11 +166,11 @@ # end # def more_parts? array = [] rc = getsockopt ZMQ::RCVMORE, array - + Util.resultcode_ok?(rc) ? array.at(0) : false end # Binds the socket to an +address+. # @@ -195,11 +194,11 @@ # Returns 0 upon success *or* when the socket has already been closed. # Returns -1 when the operation fails. Check ZMQ.errno for the error code. # # rc = socket.close # puts("Given socket was invalid!") unless 0 == rc - # + # def close if @socket remove_finalizer rc = LibZMQ.zmq_close @socket @socket = nil @@ -242,37 +241,37 @@ unless @sockopt_cache[:int64] length = FFI::MemoryPointer.new :size_t length.write_int 8 @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length] end - + @sockopt_cache[:int64] elsif int_option?(name) # int, 0mq assumes int is 4-bytes unless @sockopt_cache[:int32] length = FFI::MemoryPointer.new :size_t length.write_int 4 @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length] end - + @sockopt_cache[:int32] elsif string_option?(name) length = FFI::MemoryPointer.new :size_t # could be a string of up to 255 bytes length.write_int 255 [FFI::MemoryPointer.new(255), length] - + else # uh oh, someone passed in an unknown option; use a slop buffer unless @sockopt_cache[:unknown] length = FFI::MemoryPointer.new :size_t length.write_int 4 @sockopt_cache[:unknown] = [FFI::MemoryPointer.new(:int32), length] end - + @sockopt_cache[:unknown] end end def supported_option? name @@ -384,16 +383,16 @@ # rc = socket.getsockopt(ZMQ::HWM, array) # hwm = array.first if ZMQ::Util.resultcode_ok?(rc) # def getsockopt name, array rc = __getsockopt__ name, array - + if Util.resultcode_ok?(rc) && (RCVMORE == name || MCAST_LOOP == name) # convert to boolean array[0] = 1 == array[0] end - + rc end # Queues the message for transmission. Message is assumed to conform to the # same public API as #Message. @@ -562,18 +561,18 @@ # cause. Also, the +list+ will not be modified when there was an error. # def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag - + if Util.resultcode_ok?(rc) array.each do |message| list << message.copy_out_string message.close end end - + rc end # Receive a multipart message as an array of objects # (by default these are instances of Message). @@ -592,30 +591,64 @@ # cause. Also, the +list+ will not be modified when there was an error. # def recvmsgs list, flag = 0 flag = NOBLOCK if noblock?(flag) - parts = [] message = @receiver_klass.new rc = recv message, flag - parts << message - # check rc *first*; necessary because the call to #more_parts? can reset - # the zmq_errno to a weird value, so the zmq_errno that was set on the - # call to #recv gets lost - while Util.resultcode_ok?(rc) && more_parts? - message = @receiver_klass.new - rc = recv message, flag - parts << message + if Util.resultcode_ok?(rc) + list << message + + # check rc *first*; necessary because the call to #more_parts? can reset + # the zmq_errno to a weird value, so the zmq_errno that was set on the + # call to #recv gets lost + while Util.resultcode_ok?(rc) && more_parts? + message = @receiver_klass.new + rc = recv message, flag + + if Util.resultcode_ok?(rc) + list << message + else + message.close + list.each { |msg| msg.close } + list.clear + end + end + else + message.close end - # only append the received parts if there were no errors - # FIXME: - # need to detect EAGAIN if flag is set; EAGAIN means we have read all that we - # can and should return whatever was already read; need a spec! + rc + end + + # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes + # a +list+ for receiving the message body parts and a +routing_envelope+ + # for receiving the message parts comprising the 0mq routing information. + # + # Returns 0 when all messages were successfully dequeued. + # Returns -1 under two conditions. + # 1. A message could not be dequeued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. + # + # With a -1 return code, the user must check ZMQ.errno to determine the + # cause. Also, the +list+ *may* be modified when there was an error. + # + def recv_multipart list, routing_envelope, flag = 0 + parts = [] + rc = recvmsgs parts, flag + if Util.resultcode_ok?(rc) - parts.each { |part| list << part } + routing = true + parts.each do |part| + if routing + routing_envelope << part + routing = part.size > 0 + else + list << part + end + end end rc end @@ -660,11 +693,11 @@ end # class Socket for version2 end # LibZMQ.version2? - if LibZMQ.version3? || LibZMQ.version4? + if LibZMQ.version3? class Socket include CommonSocketBehavior include IdentitySupport # Get the options set on this socket. @@ -702,47 +735,19 @@ # rc = socket.getsockopt(ZMQ::HWM, array) # hwm = array.first if ZMQ::Util.resultcode_ok?(rc) # def getsockopt name, array rc = __getsockopt__ name, array - + if Util.resultcode_ok?(rc) && (RCVMORE == name) # convert to boolean array[0] = 1 == array[0] end - + rc end - # The last message part received is tested to see if it is a label. - # - # Equivalent to calling Socket#getsockopt with ZMQ::RCVLABEL. - # - # Warning: if the call to #getsockopt fails, this method will return - # false and swallow the error. - # - # labels = [] - # message_parts = [] - # message = Message.new - # rc = socket.recv(message) - # if ZMQ::Util.resultcode_ok?(rc) - # label? ? labels.push(message) : message_parts.push(message) - # while more_parts? - # message = Message.new - # if ZMQ::Util.resulcode_ok?(socket.recv(message)) - # label? ? labels.push(message) : message_parts.push(message) - # end - # end - # end - # - def label? - array = [] - rc = getsockopt ZMQ::RCVLABEL, array - - Util.resultcode_ok?(rc) ? array.at(0) : false - end - # Queues the message for transmission. Message is assumed to conform to the # same public API as #Message. # # +flags+ may take two values: # * 0 (default) - blocking operation @@ -794,11 +799,11 @@ # cause. # def send_strings parts, flags = 0 return -1 if !parts || parts.empty? flags = DONTWAIT if dontwait?(flags) - + parts[0..-2].each do |part| rc = send_string part, (flags | ZMQ::SNDMORE) return rc unless Util.resultcode_ok?(rc) end @@ -820,11 +825,11 @@ # cause. # def sendmsgs parts, flags = 0 return -1 if !parts || parts.empty? flags = DONTWAIT if dontwait?(flags) - + parts[0..-2].each do |part| rc = sendmsg part, (flags | ZMQ::SNDMORE) return rc unless Util.resultcode_ok?(rc) end @@ -899,49 +904,76 @@ # removed. # def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag - + if Util.resultcode_ok?(rc) array.each do |message| list << message.copy_out_string message.close end end - + rc end # Receive a multipart message as an array of objects # (by default these are instances of Message). # # +flag+ may be ZMQ::DONTWAIT. Any other flag will be # removed. # - # Raises the same exceptions as Socket#recv. - # def recvmsgs list, flag = 0 flag = DONTWAIT if dontwait?(flag) - parts = [] message = @receiver_klass.new rc = recvmsg message, flag - parts << message - # check rc *first*; necessary because the call to #more_parts? can reset - # the zmq_errno to a weird value, so the zmq_errno that was set on the - # call to #recv gets lost - while Util.resultcode_ok?(rc) && more_parts? - message = @receiver_klass.new - rc = recvmsg message, flag - parts << message + if Util.resultcode_ok?(rc) + list << message + + # check rc *first*; necessary because the call to #more_parts? can reset + # the zmq_errno to a weird value, so the zmq_errno that was set on the + # call to #recv gets lost + while Util.resultcode_ok?(rc) && more_parts? + message = @receiver_klass.new + rc = recvmsg message, flag + + if Util.resultcode_ok?(rc) + list << message + else + message.close + list.each { |msg| msg.close } + list.clear + end + end + else + message.close end - # only append the received parts if there were no errors + rc + end + + # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes + # a +list+ for receiving the message body parts and a +routing_envelope+ + # for receiving the message parts comprising the 0mq routing information. + # + def recv_multipart list, routing_envelope, flag = 0 + parts = [] + rc = recvmsgs parts, flag + if Util.resultcode_ok?(rc) - parts.each { |part| list << part } + routing = true + parts.each do |part| + if routing + routing_envelope << part + routing = part.size > 0 + else + list << part + end + end end rc end @@ -953,10 +985,9 @@ end alias :noblock? :dontwait? def int_option? name super || - RCVLABEL == name || RECONNECT_IVL_MAX == name || RCVHWM == name || SNDHWM == name || RATE == name || RECOVERY_IVL == name ||