lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.0 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.2
- old
+ new
@@ -32,11 +32,11 @@
# end
#
def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
new(context_ptr, type, opts) rescue nil
end
-
+
# To avoid rescuing exceptions, use the factory method #create for
# all socket creation.
#
# Allocates a socket of type +type+ for sending and receiving data.
#
@@ -66,11 +66,11 @@
#
def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
# users may override the classes used for receiving; class must conform to the
# same public API as ZMQ::Message
@receiver_klass = opts[:receiver_class]
-
+
context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)
unless context_ptr.null?
@socket = LibZMQ.zmq_socket context_ptr, type
if @socket && !@socket.null?
@@ -98,18 +98,17 @@
# ZMQ::MCAST_LOOP (version 2 only)
# ZMQ::LINGER
# ZMQ::RECONNECT_IVL
# ZMQ::BACKLOG
# ZMQ::RECOVER_IVL_MSEC (version 2 only)
- # ZMQ::RECONNECT_IVL_MAX (version 3/4 only)
- # ZMQ::MAXMSGSIZE (version 3/4 only)
- # ZMQ::SNDHWM (version 3/4 only)
- # ZMQ::RCVHWM (version 3/4 only)
- # ZMQ::MULTICAST_HOPS (version 3/4 only)
- # ZMQ::RCVTIMEO (version 3/4 only)
- # ZMQ::SNDTIMEO (version 3/4 only)
- # ZMQ::RCVLABEL (version 3/4 only)
+ # ZMQ::RECONNECT_IVL_MAX (version 3 only)
+ # ZMQ::MAXMSGSIZE (version 3 only)
+ # ZMQ::SNDHWM (version 3 only)
+ # ZMQ::RCVHWM (version 3 only)
+ # ZMQ::MULTICAST_HOPS (version 3 only)
+ # ZMQ::RCVTIMEO (version 3 only)
+ # ZMQ::SNDTIMEO (version 3 only)
#
# Valid +name+ values that take a string +value+ are:
# ZMQ::IDENTITY (version 2/3 only)
# ZMQ::SUBSCRIBE
# ZMQ::UNSUBSCRIBE
@@ -167,11 +166,11 @@
# end
#
def more_parts?
array = []
rc = getsockopt ZMQ::RCVMORE, array
-
+
Util.resultcode_ok?(rc) ? array.at(0) : false
end
# Binds the socket to an +address+.
#
@@ -195,11 +194,11 @@
# Returns 0 upon success *or* when the socket has already been closed.
# Returns -1 when the operation fails. Check ZMQ.errno for the error code.
#
# rc = socket.close
# puts("Given socket was invalid!") unless 0 == rc
- #
+ #
def close
if @socket
remove_finalizer
rc = LibZMQ.zmq_close @socket
@socket = nil
@@ -242,37 +241,37 @@
unless @sockopt_cache[:int64]
length = FFI::MemoryPointer.new :size_t
length.write_int 8
@sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length]
end
-
+
@sockopt_cache[:int64]
elsif int_option?(name)
# int, 0mq assumes int is 4-bytes
unless @sockopt_cache[:int32]
length = FFI::MemoryPointer.new :size_t
length.write_int 4
@sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length]
end
-
+
@sockopt_cache[:int32]
elsif string_option?(name)
length = FFI::MemoryPointer.new :size_t
# could be a string of up to 255 bytes
length.write_int 255
[FFI::MemoryPointer.new(255), length]
-
+
else
# uh oh, someone passed in an unknown option; use a slop buffer
unless @sockopt_cache[:unknown]
length = FFI::MemoryPointer.new :size_t
length.write_int 4
@sockopt_cache[:unknown] = [FFI::MemoryPointer.new(:int32), length]
end
-
+
@sockopt_cache[:unknown]
end
end
def supported_option? name
@@ -384,16 +383,16 @@
# rc = socket.getsockopt(ZMQ::HWM, array)
# hwm = array.first if ZMQ::Util.resultcode_ok?(rc)
#
def getsockopt name, array
rc = __getsockopt__ name, array
-
+
if Util.resultcode_ok?(rc) && (RCVMORE == name || MCAST_LOOP == name)
# convert to boolean
array[0] = 1 == array[0]
end
-
+
rc
end
# Queues the message for transmission. Message is assumed to conform to the
# same public API as #Message.
@@ -562,18 +561,18 @@
# cause. Also, the +list+ will not be modified when there was an error.
#
def recv_strings list, flag = 0
array = []
rc = recvmsgs array, flag
-
+
if Util.resultcode_ok?(rc)
array.each do |message|
list << message.copy_out_string
message.close
end
end
-
+
rc
end
# Receive a multipart message as an array of objects
# (by default these are instances of Message).
@@ -592,30 +591,64 @@
# cause. Also, the +list+ will not be modified when there was an error.
#
def recvmsgs list, flag = 0
flag = NOBLOCK if noblock?(flag)
- parts = []
message = @receiver_klass.new
rc = recv message, flag
- parts << message
- # check rc *first*; necessary because the call to #more_parts? can reset
- # the zmq_errno to a weird value, so the zmq_errno that was set on the
- # call to #recv gets lost
- while Util.resultcode_ok?(rc) && more_parts?
- message = @receiver_klass.new
- rc = recv message, flag
- parts << message
+ if Util.resultcode_ok?(rc)
+ list << message
+
+ # check rc *first*; necessary because the call to #more_parts? can reset
+ # the zmq_errno to a weird value, so the zmq_errno that was set on the
+ # call to #recv gets lost
+ while Util.resultcode_ok?(rc) && more_parts?
+ message = @receiver_klass.new
+ rc = recv message, flag
+
+ if Util.resultcode_ok?(rc)
+ list << message
+ else
+ message.close
+ list.each { |msg| msg.close }
+ list.clear
+ end
+ end
+ else
+ message.close
end
- # only append the received parts if there were no errors
- # FIXME:
- # need to detect EAGAIN if flag is set; EAGAIN means we have read all that we
- # can and should return whatever was already read; need a spec!
+ rc
+ end
+
+ # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes
+ # a +list+ for receiving the message body parts and a +routing_envelope+
+ # for receiving the message parts comprising the 0mq routing information.
+ #
+ # Returns 0 when all messages were successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. A message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause. Also, the +list+ *may* be modified when there was an error.
+ #
+ def recv_multipart list, routing_envelope, flag = 0
+ parts = []
+ rc = recvmsgs parts, flag
+
if Util.resultcode_ok?(rc)
- parts.each { |part| list << part }
+ routing = true
+ parts.each do |part|
+ if routing
+ routing_envelope << part
+ routing = part.size > 0
+ else
+ list << part
+ end
+ end
end
rc
end
@@ -660,11 +693,11 @@
end # class Socket for version2
end # LibZMQ.version2?
- if LibZMQ.version3? || LibZMQ.version4?
+ if LibZMQ.version3?
class Socket
include CommonSocketBehavior
include IdentitySupport
# Get the options set on this socket.
@@ -702,47 +735,19 @@
# rc = socket.getsockopt(ZMQ::HWM, array)
# hwm = array.first if ZMQ::Util.resultcode_ok?(rc)
#
def getsockopt name, array
rc = __getsockopt__ name, array
-
+
if Util.resultcode_ok?(rc) && (RCVMORE == name)
# convert to boolean
array[0] = 1 == array[0]
end
-
+
rc
end
- # The last message part received is tested to see if it is a label.
- #
- # Equivalent to calling Socket#getsockopt with ZMQ::RCVLABEL.
- #
- # Warning: if the call to #getsockopt fails, this method will return
- # false and swallow the error.
- #
- # labels = []
- # message_parts = []
- # message = Message.new
- # rc = socket.recv(message)
- # if ZMQ::Util.resultcode_ok?(rc)
- # label? ? labels.push(message) : message_parts.push(message)
- # while more_parts?
- # message = Message.new
- # if ZMQ::Util.resulcode_ok?(socket.recv(message))
- # label? ? labels.push(message) : message_parts.push(message)
- # end
- # end
- # end
- #
- def label?
- array = []
- rc = getsockopt ZMQ::RCVLABEL, array
-
- Util.resultcode_ok?(rc) ? array.at(0) : false
- end
-
# Queues the message for transmission. Message is assumed to conform to the
# same public API as #Message.
#
# +flags+ may take two values:
# * 0 (default) - blocking operation
@@ -794,11 +799,11 @@
# cause.
#
def send_strings parts, flags = 0
return -1 if !parts || parts.empty?
flags = DONTWAIT if dontwait?(flags)
-
+
parts[0..-2].each do |part|
rc = send_string part, (flags | ZMQ::SNDMORE)
return rc unless Util.resultcode_ok?(rc)
end
@@ -820,11 +825,11 @@
# cause.
#
def sendmsgs parts, flags = 0
return -1 if !parts || parts.empty?
flags = DONTWAIT if dontwait?(flags)
-
+
parts[0..-2].each do |part|
rc = sendmsg part, (flags | ZMQ::SNDMORE)
return rc unless Util.resultcode_ok?(rc)
end
@@ -899,49 +904,76 @@
# removed.
#
def recv_strings list, flag = 0
array = []
rc = recvmsgs array, flag
-
+
if Util.resultcode_ok?(rc)
array.each do |message|
list << message.copy_out_string
message.close
end
end
-
+
rc
end
# Receive a multipart message as an array of objects
# (by default these are instances of Message).
#
# +flag+ may be ZMQ::DONTWAIT. Any other flag will be
# removed.
#
- # Raises the same exceptions as Socket#recv.
- #
def recvmsgs list, flag = 0
flag = DONTWAIT if dontwait?(flag)
- parts = []
message = @receiver_klass.new
rc = recvmsg message, flag
- parts << message
- # check rc *first*; necessary because the call to #more_parts? can reset
- # the zmq_errno to a weird value, so the zmq_errno that was set on the
- # call to #recv gets lost
- while Util.resultcode_ok?(rc) && more_parts?
- message = @receiver_klass.new
- rc = recvmsg message, flag
- parts << message
+ if Util.resultcode_ok?(rc)
+ list << message
+
+ # check rc *first*; necessary because the call to #more_parts? can reset
+ # the zmq_errno to a weird value, so the zmq_errno that was set on the
+ # call to #recv gets lost
+ while Util.resultcode_ok?(rc) && more_parts?
+ message = @receiver_klass.new
+ rc = recvmsg message, flag
+
+ if Util.resultcode_ok?(rc)
+ list << message
+ else
+ message.close
+ list.each { |msg| msg.close }
+ list.clear
+ end
+ end
+ else
+ message.close
end
- # only append the received parts if there were no errors
+ rc
+ end
+
+ # Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes
+ # a +list+ for receiving the message body parts and a +routing_envelope+
+ # for receiving the message parts comprising the 0mq routing information.
+ #
+ def recv_multipart list, routing_envelope, flag = 0
+ parts = []
+ rc = recvmsgs parts, flag
+
if Util.resultcode_ok?(rc)
- parts.each { |part| list << part }
+ routing = true
+ parts.each do |part|
+ if routing
+ routing_envelope << part
+ routing = part.size > 0
+ else
+ list << part
+ end
+ end
end
rc
end
@@ -953,10 +985,9 @@
end
alias :noblock? :dontwait?
def int_option? name
super ||
- RCVLABEL == name ||
RECONNECT_IVL_MAX == name ||
RCVHWM == name ||
SNDHWM == name ||
RATE == name ||
RECOVERY_IVL == name ||