lib/ffi-rzmq/socket.rb in ffi-rzmq-0.8.2 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.9.0
- old
+ new
@@ -1,475 +1,985 @@
module ZMQ
- ZMQ_SOCKET_STR = 'zmq_socket'.freeze unless defined? ZMQ_SOCKET_STR
- ZMQ_SETSOCKOPT_STR = 'zmq_setsockopt'.freeze
- ZMQ_GETSOCKOPT_STR = 'zmq_getsockopt'.freeze
- ZMQ_BIND_STR = 'zmq_bind'.freeze
- ZMQ_CONNECT_STR = 'zmq_connect'.freeze
- ZMQ_CLOSE_STR = 'zmq_close'.freeze
- ZMQ_SEND_STR = 'zmq_send'.freeze
- ZMQ_RECV_STR = 'zmq_recv'.freeze
-
- class Socket
+ module CommonSocketBehavior
include ZMQ::Util
attr_reader :socket, :name
# Allocates a socket of type +type+ for sending and receiving data.
#
# +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
- # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH,
+ # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP,
# ZMQ::DEALER or ZMQ::ROUTER.
#
# By default, this class uses ZMQ::Message for manual
# memory management. For automatic garbage collection of received messages,
# it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
#
+ # sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
+ #
+ # Advanced users may want to replace the receiver class with their
+ # own custom class. The custom class must conform to the same public API
+ # as ZMQ::Message.
+ #
+ # Creation of a new Socket object can return nil when socket creation
+ # fails.
+ #
+ # if (socket = Socket.new(context.pointer, ZMQ::REQ))
+ # ...
+ # else
+ # STDERR.puts "Socket creation failed"
+ # end
+ #
+ def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
+ new(context_ptr, type, opts) rescue nil
+ end
+
+ # To avoid rescuing exceptions, use the factory method #create for
+ # all socket creation.
+ #
+ # Allocates a socket of type +type+ for sending and receiving data.
+ #
+ # +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
+ # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP,
+ # ZMQ::DEALER or ZMQ::ROUTER.
+ #
+ # By default, this class uses ZMQ::Message for manual
+ # memory management. For automatic garbage collection of received messages,
+ # it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
+ #
# sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
#
# Advanced users may want to replace the receiver class with their
# own custom class. The custom class must conform to the same public API
# as ZMQ::Message.
#
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
+ # Creation of a new Socket object can raise an exception. This occurs when the
+ # +context_ptr+ is null or when the allocation of the 0mq socket within the
+ # context fails.
#
+ # begin
+ # socket = Socket.new(context.pointer, ZMQ::REQ)
+ # rescue ContextError => e
+ # # error handling
+ # end
+ #
def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
# users may override the classes used for receiving; class must conform to the
# same public API as ZMQ::Message
@receiver_klass = opts[:receiver_class]
+
+ context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)
unless context_ptr.null?
@socket = LibZMQ.zmq_socket context_ptr, type
- if @socket
- error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0
+ if @socket && !@socket.null?
@name = SocketTypeNameMap[type]
else
- raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Socket pointer was null"
+ raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
end
else
- raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Context pointer was null"
+ raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
end
@sockopt_cache = {}
define_finalizer
end
# Set the queue options on this socket.
#
- # Valid +option_name+ values that take a numeric +option_value+ are:
+ # Valid +name+ values that take a numeric +value+ are:
# ZMQ::HWM
- # ZMQ::SWAP
+ # ZMQ::SWAP (version 2 only)
# ZMQ::AFFINITY
# ZMQ::RATE
# ZMQ::RECOVERY_IVL
- # ZMQ::MCAST_LOOP
+ # ZMQ::MCAST_LOOP (version 2 only)
# ZMQ::LINGER
# ZMQ::RECONNECT_IVL
# ZMQ::BACKLOG
- # ZMQ::RECOVER_IVL_MSEC
+ # ZMQ::RECOVER_IVL_MSEC (version 2 only)
+ # ZMQ::RECONNECT_IVL_MAX (version 3/4 only)
+ # ZMQ::MAXMSGSIZE (version 3/4 only)
+ # ZMQ::SNDHWM (version 3/4 only)
+ # ZMQ::RCVHWM (version 3/4 only)
+ # ZMQ::MULTICAST_HOPS (version 3/4 only)
+ # ZMQ::RCVTIMEO (version 3/4 only)
+ # ZMQ::SNDTIMEO (version 3/4 only)
+ # ZMQ::RCVLABEL (version 3/4 only)
#
- # Valid +option_name+ values that take a string +option_value+ are:
- # ZMQ::IDENTITY
+ # Valid +name+ values that take a string +value+ are:
+ # ZMQ::IDENTITY (version 2/3 only)
# ZMQ::SUBSCRIBE
# ZMQ::UNSUBSCRIBE
#
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
+ # Returns 0 when the operation completed successfully.
+ # Returns -1 when this operation failed.
#
- def setsockopt option_name, option_value, option_len = nil
- option_value = sanitize_value option_name, option_value
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # rc = socket.setsockopt(ZMQ::LINGER, 1_000)
+ # ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
+ #
+ def setsockopt name, value, length = nil
+ if long_long_option?(name)
+ length = 8
+ pointer = LibC.malloc length
+ pointer.write_long_long value
- begin
- case option_name
- when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
- option_len = 8 # all of these options are defined as int64_t or uint64_t
- option_value_ptr = LibC.malloc option_len
- option_value_ptr.write_long_long option_value
+ elsif int_option?(name)
+ length = 4
+ pointer = LibC.malloc length
+ pointer.write_int value
- when LINGER, RECONNECT_IVL, BACKLOG
- option_len = 4 # hard-code "int" length to 4 bytes
- option_value_ptr = LibC.malloc option_len
- option_value_ptr.write_int option_value
+ elsif string_option?(name)
+ length ||= value.size
- when IDENTITY, SUBSCRIBE, UNSUBSCRIBE
- option_len ||= option_value.size
+ # note: not checking errno for failed memory allocations :(
+ pointer = LibC.malloc length
+ pointer.write_string value
+ end
- # note: not checking errno for failed memory allocations :(
- option_value_ptr = LibC.malloc option_len
- option_value_ptr.write_string option_value
+ rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
+ LibC.free(pointer) unless pointer.nil? || pointer.null?
+ rc
+ end
- else
- # we didn't understand the passed option argument
- # will force a raise due to EINVAL being non-zero
- error_check ZMQ_SETSOCKOPT_STR, EINVAL
- end
-
- result_code = LibZMQ.zmq_setsockopt @socket, option_name, option_value_ptr, option_len
- error_check ZMQ_SETSOCKOPT_STR, result_code
- ensure
- LibC.free option_value_ptr unless option_value_ptr.nil? || option_value_ptr.null?
- end
+ # Convenience method for checking on additional message parts.
+ #
+ # Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.
+ #
+ # Warning: if the call to #getsockopt fails, this method will return
+ # false and swallow the error.
+ #
+ # message_parts = []
+ # message = Message.new
+ # rc = socket.recv(message)
+ # if ZMQ::Util.resultcode_ok?(rc)
+ # message_parts << message
+ # while more_parts?
+ # message = Message.new
+ # rc = socket.recv(message)
+ # message_parts.push(message) if resulcode_ok?(rc)
+ # end
+ # end
+ #
+ def more_parts?
+ array = []
+ rc = getsockopt ZMQ::RCVMORE, array
+
+ Util.resultcode_ok?(rc) ? array.at(0) : false
end
- # Get the options set on this socket. Returns a value dependent upon
- # the +option_name+ requested.
+ # Binds the socket to an +address+.
#
- # Valid +option_name+ values and their return types:
- # ZMQ::RCVMORE - boolean
- # ZMQ::HWM - integer
- # ZMQ::SWAP - integer
- # ZMQ::AFFINITY - bitmap in an integer
- # ZMQ::IDENTITY - string
- # ZMQ::RATE - integer
- # ZMQ::RECOVERY_IVL - integer
- # ZMQ::MCAST_LOOP - boolean
- # ZMQ::SNDBUF - integer
- # ZMQ::RCVBUF - integer
- # ZMQ::FD - fd in an integer
- # ZMQ::EVENTS - bitmap integer
- # ZMQ::LINGER - integer measured in milliseconds
- # ZMQ::RECONNECT_IVL - integer measured in milliseconds
- # ZMQ::BACKLOG - integer
- # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
+ # socket.bind("tcp://127.0.0.1:5555")
#
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
+ def bind address
+ LibZMQ.zmq_bind @socket, address
+ end
+
+ # Connects the socket to an +address+.
#
- def getsockopt option_name
- begin
- option_value = FFI::MemoryPointer.new :pointer
- option_length = FFI::MemoryPointer.new(:size_t) rescue FFI::MemoryPointer.new(:ulong)
+ # socket.connect("tcp://127.0.0.1:5555")
+ #
+ def connect address
+ rc = LibZMQ.zmq_connect @socket, address
+ end
- unless [
- TYPE, RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY,
- SNDBUF, RCVBUF, FD, EVENTS, LINGER, RECONNECT_IVL, BACKLOG, RECOVERY_IVL_MSEC
- ].include? option_name
- # we didn't understand the passed option argument
- # will force a raise
- error_check ZMQ_SETSOCKOPT_STR, -1
+ # Closes the socket. Any unprocessed messages in queue are sent or dropped
+ # depending upon the value of the socket option ZMQ::LINGER.
+ #
+ # Returns 0 upon success *or* when the socket has already been closed.
+ # Returns -1 when the operation fails. Check ZMQ.errno for the error code.
+ #
+ # rc = socket.close
+ # puts("Given socket was invalid!") unless 0 == rc
+ #
+ def close
+ if @socket
+ remove_finalizer
+ rc = LibZMQ.zmq_close @socket
+ @socket = nil
+ release_cache
+ rc
+ else
+ 0
end
+ end
- option_value, option_length = alloc_temp_sockopt_buffers option_name
- result_code = LibZMQ.zmq_getsockopt @socket, option_name, option_value, option_length
- error_check ZMQ_GETSOCKOPT_STR, result_code
- ret = 0
+ private
- case option_name
- when RCVMORE, MCAST_LOOP
- # boolean return
- ret = option_value.read_long_long != 0
- when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
- ret = option_value.read_long_long
- when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
- ret = option_value.read_int
- when IDENTITY
- ret = option_value.read_string(option_length.read_long_long)
+ def __getsockopt__ name, array
+ value, length = sockopt_buffers name
+
+ rc = LibZMQ.zmq_getsockopt @socket, name, value, length
+
+ if Util.resultcode_ok?(rc)
+ result = if int_option?(name)
+ value.read_int
+ elsif long_long_option?(name)
+ value.read_long_long
+ elsif string_option?(name)
+ value.read_string(length.read_int)
+ end
+
+ array << result
end
- ret
+ rc
end
- end
- # Convenience method for checking on additional message parts.
- #
- # Equivalent to Socket#getsockopt ZMQ::RCVMORE
- #
- def more_parts?
- getsockopt ZMQ::RCVMORE
- end
+ # Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers
+ # for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often.
+ # Cannot save the buffer for the IDENTITY.
+ def sockopt_buffers name
+ if long_long_option?(name)
+ # int64_t or uint64_t
+ unless @sockopt_cache[:int64]
+ length = FFI::MemoryPointer.new :size_t
+ length.write_int 8
+ @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length]
+ end
+
+ @sockopt_cache[:int64]
- # Convenience method for getting the value of the socket IDENTITY.
- #
- def identity
- getsockopt ZMQ::IDENTITY
- end
+ elsif int_option?(name)
+ # int, 0mq assumes int is 4-bytes
+ unless @sockopt_cache[:int32]
+ length = FFI::MemoryPointer.new :size_t
+ length.write_int 4
+ @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length]
+ end
+
+ @sockopt_cache[:int32]
- # Convenience method for setting the value of the socket IDENTITY.
- #
- def identity= value
- setsockopt ZMQ::IDENTITY, value.to_s
- end
+ elsif string_option?(name)
+ length = FFI::MemoryPointer.new :size_t
+ # could be a string of up to 255 bytes
+ length.write_int 255
+ [FFI::MemoryPointer.new(255), length]
+
+ else
+ # uh oh, someone passed in an unknown option; use a slop buffer
+ unless @sockopt_cache[:unknown]
+ length = FFI::MemoryPointer.new :size_t
+ length.write_int 4
+ @sockopt_cache[:unknown] = [FFI::MemoryPointer.new(:int32), length]
+ end
+
+ @sockopt_cache[:unknown]
+ end
+ end
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
- #
- def bind address
- result_code = LibZMQ.zmq_bind @socket, address
- error_check ZMQ_BIND_STR, result_code
- end
+ def supported_option? name
+ int_option?(name) || long_long_option?(name) || string_option?(name)
+ end
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
- #
- def connect address
- result_code = LibZMQ.zmq_connect @socket, address
- error_check ZMQ_CONNECT_STR, result_code
- end
+ def int_option? name
+ EVENTS == name ||
+ LINGER == name ||
+ RECONNECT_IVL == name ||
+ FD == name ||
+ TYPE == name ||
+ BACKLOG == name
+ end
- # Closes the socket. Any unprocessed messages in queue are sent or dropped
- # depending upon the value of the socket option ZMQ::LINGER.
- #
- def close
- if @socket
- remove_finalizer
- result_code = LibZMQ.zmq_close @socket
- error_check ZMQ_CLOSE_STR, result_code
- @socket = nil
- release_cache
+ def string_option? name
+ SUBSCRIBE == name ||
+ UNSUBSCRIBE == name
end
- end
- # Queues the message for transmission. Message is assumed to conform to the
- # same public API as #Message.
- #
- # +flags+ may take two values:
- # * 0 (default) - blocking operation
- # * ZMQ::NOBLOCK - non-blocking operation
- # * ZMQ::SNDMORE - this message is part of a multi-part message
- #
- # Returns true when the message was successfully enqueued.
- # Returns false under two conditions.
- # 1. The message could not be enqueued
- # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
- #
- # The application code is responsible for handling the +message+ object
- # lifecycle when #send returns.
- #
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
- #
- def send message, flags = 0
- begin
- result_code = LibZMQ.zmq_send @socket, message.address, flags
+ def long_long_option? name
+ RCVMORE == name ||
+ AFFINITY == name
+ end
- # when the flag isn't set, do a normal error check
- # when set, check to see if the message was successfully queued
- queued = noblock?(flags) ? error_check_nonblock(result_code) : error_check(ZMQ_SEND_STR, result_code)
+ def unsupported_setsock_option? name
+ RCVMORE == name
end
- # true if sent, false if failed/EAGAIN
- queued
- end
+ def unsupported_getsock_option? name
+ UNSUBSCRIBE == name ||
+ SUBSCRIBE == name
+ end
- # Helper method to make a new #Message instance out of the +message_string+ passed
- # in for transmission.
- #
- # +flags+ may be ZMQ::NOBLOCK.
- #
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
- #
- def send_string message_string, flags = 0
- message = Message.new message_string
- result_code = send_and_close message, flags
+ def release_cache
+ @sockopt_cache.clear
+ end
+ end # module CommonSocketBehavior
- result_code
- end
- # Send a sequence of strings as a multipart message out of the +parts+
- # passed in for transmission. Every element of +parts+ should be
- # a String.
- #
- # +flags+ may be ZMQ::NOBLOCK.
- #
- # Raises the same exceptions as Socket#send.
- #
- def send_strings parts, flags = 0
- return false if !parts || parts.empty?
+ module IdentitySupport
- parts[0...-1].each do |part|
- return false unless send_string part, flags | ZMQ::SNDMORE
+ # Convenience method for getting the value of the socket IDENTITY.
+ #
+ def identity
+ array = []
+ getsockopt IDENTITY, array
+ array.at(0)
end
- send_string parts[-1], flags
- end
-
- # Sends a message. This will automatically close the +message+ for both successful
- # and failed sends.
- #
- # Raises the same exceptions as Socket#send
- #
- def send_and_close message, flags = 0
- begin
- result_code = send message, flags
- ensure
- message.close
+ # Convenience method for setting the value of the socket IDENTITY.
+ #
+ def identity=(value)
+ setsockopt IDENTITY, value.to_s
end
- result_code
- end
- # Dequeues a message from the underlying queue. By default, this is a blocking operation.
- #
- # +flags+ may take two values:
- # 0 (default) - blocking operation
- # ZMQ::NOBLOCK - non-blocking operation
- #
- # Returns a true when it successfully dequeues one from the queue. Also, the +message+
- # object is populated by the library with a data buffer containing the received
- # data.
- #
- # Returns nil when a message could not be dequeued *and* +flags+ is set
- # with ZMQ::NOBLOCK. The +message+ object is not modified in this situation.
- #
- # The application code is *not* responsible for handling the +message+ object lifecycle
- # when #recv raises an exception. The #recv method takes ownership of the
- # +message+ and its associated buffers. A failed call will
- # release the data buffers assigned to the +message+.
- #
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
- #
- def recv message, flags = 0
- begin
- dequeued = _recv message, flags
- rescue ZeroMQError
- message.close
- raise
+
+ private
+
+ def string_option? name
+ super ||
+ IDENTITY == name
end
+ end # module IdentitySupport
- dequeued ? true : nil
- end
- # Helper method to make a new #Message instance and convert its payload
- # to a string.
- #
- # +flags+ may be ZMQ::NOBLOCK.
- #
- # Can raise two kinds of exceptions depending on the error.
- # ContextError:: Raised when a socket operation is attempted on a terminated
- # #Context. See #ContextError.
- # SocketError:: See all of the possibilities in the docs for #SocketError.
- #
- def recv_string flags = 0
- message = @receiver_klass.new
+ if LibZMQ.version2?
- begin
- dequeued = _recv message, flags
+ class Socket
+ # Inclusion order is *important* since later modules may have a call
+ # to #super. We want those calls to go up the chain in a particular
+ # order
+ include CommonSocketBehavior
+ include IdentitySupport
- if dequeued
- message.copy_out_string
- else
- nil
+ # Get the options set on this socket.
+ #
+ # +name+ determines the socket option to request
+ # +array+ should be an empty array; a result of the proper type
+ # (numeric, string, boolean) will be inserted into
+ # the first position.
+ #
+ # Valid +option_name+ values:
+ # ZMQ::RCVMORE - true or false
+ # ZMQ::HWM - integer
+ # ZMQ::SWAP - integer
+ # ZMQ::AFFINITY - bitmap in an integer
+ # ZMQ::IDENTITY - string
+ # ZMQ::RATE - integer
+ # ZMQ::RECOVERY_IVL - integer
+ # ZMQ::MCAST_LOOP - true or false
+ # ZMQ::SNDBUF - integer
+ # ZMQ::RCVBUF - integer
+ # ZMQ::FD - fd in an integer
+ # ZMQ::EVENTS - bitmap integer
+ # ZMQ::LINGER - integer measured in milliseconds
+ # ZMQ::RECONNECT_IVL - integer measured in milliseconds
+ # ZMQ::BACKLOG - integer
+ # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
+ #
+ # Returns 0 when the operation completed successfully.
+ # Returns -1 when this operation failed.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # # retrieve high water mark
+ # array = []
+ # rc = socket.getsockopt(ZMQ::HWM, array)
+ # hwm = array.first if ZMQ::Util.resultcode_ok?(rc)
+ #
+ def getsockopt name, array
+ rc = __getsockopt__ name, array
+
+ if Util.resultcode_ok?(rc) && (RCVMORE == name || MCAST_LOOP == name)
+ # convert to boolean
+ array[0] = 1 == array[0]
+ end
+
+ rc
end
- ensure
- message.close
- end
- end
- # Receive a multipart message as a list of strings.
- #
- # +flags+ may be ZMQ::NOBLOCK.
- #
- # Raises the same exceptions as Socket#recv.
- #
- def recv_strings flags = 0
- parts = []
- parts << recv_string(flags)
- parts << recv_string(flags) while more_parts?
- parts
- end
+ # Queues the message for transmission. Message is assumed to conform to the
+ # same public API as #Message.
+ #
+ # +flags+ may take two values:
+ # * 0 (default) - blocking operation
+ # * ZMQ::NOBLOCK - non-blocking operation
+ # * ZMQ::SNDMORE - this message is part of a multi-part message
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # The application code is responsible for handling the +message+ object
+ # lifecycle when #send returns. Regardless of the return code, the user
+ # is responsible for calling message.close to free the memory in use.
+ #
+ def send message, flags = 0
+ LibZMQ.zmq_send @socket, message.address, flags
+ end
- private
-
- def noblock? flag
- (NOBLOCK & flag) == NOBLOCK
- end
+ # Helper method to make a new #Message instance out of the +message_string+ passed
+ # in for transmission.
+ #
+ # +flags+ may be ZMQ::NOBLOCK.
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def send_string message_string, flags = 0
+ message = Message.new message_string
+ send_and_close message, flags
+ end
- def _recv message, flags = 0
- result_code = LibZMQ.zmq_recv @socket, message.address, flags
+ # Send a sequence of strings as a multipart message out of the +parts+
+ # passed in for transmission. Every element of +parts+ should be
+ # a String.
+ #
+ # +flags+ may be ZMQ::NOBLOCK.
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def send_strings parts, flags = 0
+ return -1 if !parts || parts.empty?
- if noblock?(flags)
- error_check_nonblock(result_code)
- else
- error_check(ZMQ_RECV_STR, result_code)
- end
- end
+ parts[0..-2].each do |part|
+ rc = send_string part, flags | ZMQ::SNDMORE
+ return rc unless Util.resultcode_ok?(rc)
+ end
- # Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers
- # for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often.
- # Cannot save the buffer for the IDENTITY.
- def alloc_temp_sockopt_buffers option_name
- case option_name
- when RCVMORE, MCAST_LOOP, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
- # int64_t
- unless @sockopt_cache[:int64]
- length = FFI::MemoryPointer.new :int64
- length.write_long_long 8
- @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length]
+ send_string parts[-1], flags
end
- @sockopt_cache[:int64]
- when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
- # int, 0mq assumes int is 4-bytes
- unless @sockopt_cache[:int32]
- length = FFI::MemoryPointer.new :int32
- length.write_int 4
- @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length]
+ # Send a sequence of messages as a multipart message out of the +parts+
+ # passed in for transmission. Every element of +parts+ should be
+ # a Message (or subclass).
+ #
+ # +flags+ may be ZMQ::NOBLOCK.
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def sendmsgs parts, flags = 0
+ return -1 if !parts || parts.empty?
+
+ parts[0..-2].each do |part|
+ rc = send part, flags | ZMQ::SNDMORE
+ return rc unless Util.resultcode_ok?(rc)
+ end
+
+ send parts[-1], flags
end
- @sockopt_cache[:int32]
- when IDENTITY
- length = FFI::MemoryPointer.new :int64
- # could be a string of up to 255 bytes
- length.write_long_long 255
- [FFI::MemoryPointer.new(255), length]
- end
- end
+ # Sends a message. This will automatically close the +message+ for both successful
+ # and failed sends.
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def send_and_close message, flags = 0
+ rc = send message, flags
+ message.close
+ rc
+ end
- def sanitize_value option_name, option_value
- case option_name
- when HWM, AFFINITY, SNDBUF, RCVBUF
- option_value.abs
- when MCAST_LOOP
- option_value ? 1 : 0
- else
- option_value
- end
- end
-
- def release_cache
- @sockopt_cache.clear
- end
+ # Dequeues a message from the underlying queue. By default, this is a blocking operation.
+ #
+ # +flags+ may take two values:
+ # 0 (default) - blocking operation
+ # ZMQ::NOBLOCK - non-blocking operation
+ #
+ # Returns 0 when the message was successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # The application code is responsible for handling the +message+ object lifecycle
+ # when #recv returns an error code.
+ #
+ def recv message, flags = 0
+ LibZMQ.zmq_recv @socket, message.address, flags
+ end
- # require a minimum of 0mq 2.1.0 to support socket finalizers; it contains important
- # fixes for sockets and threads so that a garbage collector thread can successfully
- # reap this resource without crashing
- if Util.minimum_api?([2, 1, 0])
- def define_finalizer
- ObjectSpace.define_finalizer(self, self.class.close(@socket))
- end
- else
- def define_finalizer
- # no op
- end
- end
+ # Converts the received message to a string and replaces the +string+ arg
+ # contents.
+ #
+ # +string+ should be an empty string, .e.g. ''
+ # +flags+ may be ZMQ::NOBLOCK.
+ #
+ # Returns 0 when the message was successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def recv_string string, flags = 0
+ message = @receiver_klass.new
+ rc = recv message, flags
+ string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
+ message.close
+ rc
+ end
- def remove_finalizer
- ObjectSpace.undefine_finalizer self
- end
+ # Receive a multipart message as a list of strings.
+ #
+ # +list+ should be an object that responds to #append or #<< so received
+ # strings can be appended to it
+ # +flag+ may be ZMQ::NOBLOCK. Any other flag will be removed
+ #
+ # Returns 0 when all messages were successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. A message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause. Also, the +list+ will not be modified when there was an error.
+ #
+ def recv_strings list, flag = 0
+ array = []
+ rc = recvmsgs array, flag
+
+ if Util.resultcode_ok?(rc)
+ array.each do |message|
+ list << message.copy_out_string
+ message.close
+ end
+ end
+
+ rc
+ end
- def self.close socket
- Proc.new { LibZMQ.zmq_close socket }
- end
-end # class Socket
+ # Receive a multipart message as an array of objects
+ # (by default these are instances of Message).
+ #
+ # +list+ should be an object that responds to #append or #<< so received
+ # messages can be appended to it
+ # +flag+ may be ZMQ::NOBLOCK. Any other flag will be
+ # removed.
+ #
+ # Returns 0 when all messages were successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. A message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause. Also, the +list+ will not be modified when there was an error.
+ #
+ def recvmsgs list, flag = 0
+ flag = NOBLOCK if noblock?(flag)
+
+ parts = []
+ message = @receiver_klass.new
+ rc = recv message, flag
+ parts << message
+
+ # check rc *first*; necessary because the call to #more_parts? can reset
+ # the zmq_errno to a weird value, so the zmq_errno that was set on the
+ # call to #recv gets lost
+ while Util.resultcode_ok?(rc) && more_parts?
+ message = @receiver_klass.new
+ rc = recv message, flag
+ parts << message
+ end
+
+ # only append the received parts if there were no errors
+ # FIXME:
+ # need to detect EAGAIN if flag is set; EAGAIN means we have read all that we
+ # can and should return whatever was already read; need a spec!
+ if Util.resultcode_ok?(rc)
+ parts.each { |part| list << part }
+ end
+
+ rc
+ end
+
+
+ private
+
+ def noblock? flags
+ (NOBLOCK & flags) == NOBLOCK
+ end
+
+ def int_option? name
+ super ||
+ RECONNECT_IVL_MAX == name
+ end
+
+ def long_long_option? name
+ super ||
+ HWM == name ||
+ SWAP == name ||
+ RATE == name ||
+ RECOVERY_IVL == name ||
+ RECOVERY_IVL_MSEC == name ||
+ MCAST_LOOP == name ||
+ SNDBUF == name ||
+ RCVBUF == name
+ end
+
+ # these finalizer-related methods cannot live in the CommonSocketBehavior
+ # module; they *must* be in the class definition directly
+
+ def define_finalizer
+ ObjectSpace.define_finalizer(self, self.class.close(@socket))
+ end
+
+ def remove_finalizer
+ ObjectSpace.undefine_finalizer self
+ end
+
+ def self.close socket
+ Proc.new { LibZMQ.zmq_close socket }
+ end
+ end # class Socket for version2
+
+ end # LibZMQ.version2?
+
+
+ if LibZMQ.version3? || LibZMQ.version4?
+ class Socket
+ include CommonSocketBehavior
+ include IdentitySupport
+
+ # Get the options set on this socket.
+ #
+ # +name+ determines the socket option to request
+ # +array+ should be an empty array; a result of the proper type
+ # (numeric, string, boolean) will be inserted into
+ # the first position.
+ #
+ # Valid +option_name+ values:
+ # ZMQ::RCVMORE - true or false
+ # ZMQ::HWM - integer
+ # ZMQ::SWAP - integer
+ # ZMQ::AFFINITY - bitmap in an integer
+ # ZMQ::IDENTITY - string
+ # ZMQ::RATE - integer
+ # ZMQ::RECOVERY_IVL - integer
+ # ZMQ::SNDBUF - integer
+ # ZMQ::RCVBUF - integer
+ # ZMQ::FD - fd in an integer
+ # ZMQ::EVENTS - bitmap integer
+ # ZMQ::LINGER - integer measured in milliseconds
+ # ZMQ::RECONNECT_IVL - integer measured in milliseconds
+ # ZMQ::BACKLOG - integer
+ # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
+ #
+ # Returns 0 when the operation completed successfully.
+ # Returns -1 when this operation failed.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # # retrieve high water mark
+ # array = []
+ # rc = socket.getsockopt(ZMQ::HWM, array)
+ # hwm = array.first if ZMQ::Util.resultcode_ok?(rc)
+ #
+ def getsockopt name, array
+ rc = __getsockopt__ name, array
+
+ if Util.resultcode_ok?(rc) && (RCVMORE == name)
+ # convert to boolean
+ array[0] = 1 == array[0]
+ end
+
+ rc
+ end
+
+ # The last message part received is tested to see if it is a label.
+ #
+ # Equivalent to calling Socket#getsockopt with ZMQ::RCVLABEL.
+ #
+ # Warning: if the call to #getsockopt fails, this method will return
+ # false and swallow the error.
+ #
+ # labels = []
+ # message_parts = []
+ # message = Message.new
+ # rc = socket.recv(message)
+ # if ZMQ::Util.resultcode_ok?(rc)
+ # label? ? labels.push(message) : message_parts.push(message)
+ # while more_parts?
+ # message = Message.new
+ # if ZMQ::Util.resulcode_ok?(socket.recv(message))
+ # label? ? labels.push(message) : message_parts.push(message)
+ # end
+ # end
+ # end
+ #
+ def label?
+ array = []
+ rc = getsockopt ZMQ::RCVLABEL, array
+
+ Util.resultcode_ok?(rc) ? array.at(0) : false
+ end
+
+ # Queues the message for transmission. Message is assumed to conform to the
+ # same public API as #Message.
+ #
+ # +flags+ may take two values:
+ # * 0 (default) - blocking operation
+ # * ZMQ::DONTWAIT - non-blocking operation
+ # * ZMQ::SNDMORE - this message is part of a multi-part message
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def sendmsg message, flags = 0
+ LibZMQ.zmq_sendmsg @socket, message.address, flags
+ end
+
+ # Helper method to make a new #Message instance out of the +string+ passed
+ # in for transmission.
+ #
+ # +flags+ may be ZMQ::DONTWAIT and ZMQ::SNDMORE.
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def send_string string, flags = 0
+ message = Message.new string
+ send_and_close message, flags
+ end
+
+ # Send a sequence of strings as a multipart message out of the +parts+
+ # passed in for transmission. Every element of +parts+ should be
+ # a String.
+ #
+ # +flags+ may be ZMQ::DONTWAIT.
+ #
+ # Returns 0 when the messages were successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. A message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def send_strings parts, flags = 0
+ return -1 if !parts || parts.empty?
+ flags = DONTWAIT if dontwait?(flags)
+
+ parts[0..-2].each do |part|
+ rc = send_string part, (flags | ZMQ::SNDMORE)
+ return rc unless Util.resultcode_ok?(rc)
+ end
+
+ send_string parts[-1], flags
+ end
+
+ # Send a sequence of messages as a multipart message out of the +parts+
+ # passed in for transmission. Every element of +parts+ should be
+ # a Message (or subclass).
+ #
+ # +flags+ may be ZMQ::DONTWAIT.
+ #
+ # Returns 0 when the messages were successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. A message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def sendmsgs parts, flags = 0
+ return -1 if !parts || parts.empty?
+ flags = DONTWAIT if dontwait?(flags)
+
+ parts[0..-2].each do |part|
+ rc = sendmsg part, (flags | ZMQ::SNDMORE)
+ return rc unless Util.resultcode_ok?(rc)
+ end
+
+ sendmsg parts[-1], flags
+ end
+
+ # Sends a message. This will automatically close the +message+ for both successful
+ # and failed sends.
+ #
+ # Returns 0 when the message was successfully enqueued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ def send_and_close message, flags = 0
+ rc = sendmsg message, flags
+ message.close
+ rc
+ end
+
+ # Dequeues a message from the underlying queue. By default, this is a blocking operation.
+ #
+ # +flags+ may take two values:
+ # 0 (default) - blocking operation
+ # ZMQ::DONTWAIT - non-blocking operation
+ #
+ # Returns 0 when the message was successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # The application code is responsible for handling the +message+ object lifecycle
+ # when #recv returns an error code.
+ #
+ def recvmsg message, flags = 0
+ LibZMQ.zmq_recvmsg @socket, message.address, flags
+ end
+
+ # Helper method to make a new #Message instance and convert its payload
+ # to a string.
+ #
+ # +flags+ may be ZMQ::DONTWAIT.
+ #
+ # Returns 0 when the message was successfully dequeued.
+ # Returns -1 under two conditions.
+ # 1. The message could not be dequeued
+ # 2. When +flags+ is set with ZMQ::DONTWAIT and the socket returned EAGAIN.
+ #
+ # With a -1 return code, the user must check ZMQ.errno to determine the
+ # cause.
+ #
+ # The application code is responsible for handling the +message+ object lifecycle
+ # when #recv returns an error code.
+ #
+ def recv_string string, flags = 0
+ message = @receiver_klass.new
+ rc = recvmsg message, flags
+ string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
+ message.close
+ rc
+ end
+
+ # Receive a multipart message as a list of strings.
+ #
+ # +flag+ may be ZMQ::DONTWAIT. Any other flag will be
+ # removed.
+ #
+ def recv_strings list, flag = 0
+ array = []
+ rc = recvmsgs array, flag
+
+ if Util.resultcode_ok?(rc)
+ array.each do |message|
+ list << message.copy_out_string
+ message.close
+ end
+ end
+
+ rc
+ end
+
+ # Receive a multipart message as an array of objects
+ # (by default these are instances of Message).
+ #
+ # +flag+ may be ZMQ::DONTWAIT. Any other flag will be
+ # removed.
+ #
+ # Raises the same exceptions as Socket#recv.
+ #
+ def recvmsgs list, flag = 0
+ flag = DONTWAIT if dontwait?(flag)
+
+ parts = []
+ message = @receiver_klass.new
+ rc = recvmsg message, flag
+ parts << message
+
+ # check rc *first*; necessary because the call to #more_parts? can reset
+ # the zmq_errno to a weird value, so the zmq_errno that was set on the
+ # call to #recv gets lost
+ while Util.resultcode_ok?(rc) && more_parts?
+ message = @receiver_klass.new
+ rc = recvmsg message, flag
+ parts << message
+ end
+
+ # only append the received parts if there were no errors
+ if Util.resultcode_ok?(rc)
+ parts.each { |part| list << part }
+ end
+
+ rc
+ end
+
+
+ private
+
+ def dontwait? flags
+ (DONTWAIT & flags) == DONTWAIT
+ end
+ alias :noblock? :dontwait?
+
+ def int_option? name
+ super ||
+ RCVLABEL == name ||
+ RECONNECT_IVL_MAX == name ||
+ RCVHWM == name ||
+ SNDHWM == name ||
+ RATE == name ||
+ RECOVERY_IVL == name ||
+ SNDBUF == name ||
+ RCVBUF == name
+ end
+
+ # these finalizer-related methods cannot live in the CommonSocketBehavior
+ # module; they *must* be in the class definition directly
+
+ def define_finalizer
+ ObjectSpace.define_finalizer(self, self.class.close(@socket))
+ end
+
+ def remove_finalizer
+ ObjectSpace.undefine_finalizer self
+ end
+
+ def self.close socket
+ Proc.new { LibZMQ.zmq_close socket }
+ end
+ end # Socket for version3
+ end # LibZMQ.version3?
end # module ZMQ