lib/ffi-rzmq/socket.rb in ffi-rzmq-0.6.0 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.7.0
- old
+ new
@@ -25,11 +25,11 @@
# memory management. For automatic garbage collection of received messages,
# it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
#
# sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
#
- # Advanced users may want to replace the receiver class with their
+ # Advanced users may want to replace the receiver class with their
# own custom class. The custom class must conform to the same public API
# as ZMQ::Message.
#
# Can raise two kinds of exceptions depending on the error.
# ContextError:: Raised when a socket operation is attempted on a terminated
@@ -41,17 +41,23 @@
# same public API as ZMQ::Message
@receiver_klass = opts[:receiver_class]
unless context_ptr.null?
@socket = LibZMQ.zmq_socket context_ptr, type
- error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0
- @name = SocketTypeNameMap[type]
+ if @socket
+ error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0
+ @name = SocketTypeNameMap[type]
+ else
+ raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Socket pointer was null"
+ end
else
raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Context pointer was null"
end
- #define_finalizer
+ @sockopt_cache = {}
+
+ define_finalizer
end
# Set the queue options on this socket.
#
# Valid +option_name+ values that take a numeric +option_value+ are:
@@ -59,10 +65,14 @@
# ZMQ::SWAP
# ZMQ::AFFINITY
# ZMQ::RATE
# ZMQ::RECOVERY_IVL
# ZMQ::MCAST_LOOP
+ # ZMQ::LINGER
+ # ZMQ::RECONNECT_IVL
+ # ZMQ::BACKLOG
+ # ZMQ::RECOVER_IVL_MSEC
#
# Valid +option_name+ values that take a string +option_value+ are:
# ZMQ::IDENTITY
# ZMQ::SUBSCRIBE
# ZMQ::UNSUBSCRIBE
@@ -73,17 +83,22 @@
# SocketError:: See all of the possibilities in the docs for #SocketError.
#
def setsockopt option_name, option_value, option_len = nil
option_value = sanitize_value option_name, option_value
option_len ||= option_value.size
-
+
begin
case option_name
- when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF
+ when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
option_value_ptr = LibC.malloc option_len
option_value_ptr.write_long option_value
+ when LINGER, RECONNECT_IVL, BACKLOG
+ option_len = 4 # hard-code "int" length to 4 bytes
+ option_value_ptr = LibC.malloc option_len
+ option_value_ptr.write_int option_value
+
when IDENTITY, SUBSCRIBE, UNSUBSCRIBE
# note: not checking errno for failed memory allocations :(
option_value_ptr = LibC.malloc option_len
option_value_ptr.write_string option_value
@@ -112,10 +127,16 @@
# ZMQ::RATE - integer
# ZMQ::RECOVERY_IVL - integer
# ZMQ::MCAST_LOOP - boolean
# ZMQ::SNDBUF - integer
# ZMQ::RCVBUF - integer
+ # ZMQ::FD - fd in an integer
+ # ZMQ::EVENTS - bitmap integer
+ # ZMQ::LINGER - integer measured in milliseconds
+ # ZMQ::RECONNECT_IVL - integer measured in milliseconds
+ # ZMQ::BACKLOG - integer
+ # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
#
# Can raise two kinds of exceptions depending on the error.
# ContextError:: Raised when a socket operation is attempted on a terminated
# #Context. See #ContextError.
# SocketError:: See all of the possibilities in the docs for #SocketError.
@@ -123,12 +144,14 @@
def getsockopt option_name
begin
option_value = FFI::MemoryPointer.new :pointer
option_length = FFI::MemoryPointer.new :size_t
- unless [RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP,
- IDENTITY, SNDBUF, RCVBUF].include? option_name
+ unless [
+ RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY,
+ SNDBUF, RCVBUF, FD, EVENTS, LINGER, RECONNECT_IVL, BACKLOG, RECOVERY_IVL_MSEC
+ ].include? option_name
# we didn't understand the passed option argument
# will force a raise
error_check ZMQ_SETSOCKOPT_STR, -1
end
@@ -140,12 +163,14 @@
case option_name
when RCVMORE, MCAST_LOOP
# boolean return
ret = option_value.read_long_long != 0
- when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF
+ when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
ret = option_value.read_long_long
+ when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
+ ret = option_value.read_int
when IDENTITY
ret = option_value.read_string(option_length.read_long_long)
end
ret
@@ -190,16 +215,20 @@
def connect address
result_code = LibZMQ.zmq_connect @socket, address
error_check ZMQ_CONNECT_STR, result_code
end
- # Closes the socket. Any unprocessed messages in queue are dropped.
+ # Closes the socket. Any unprocessed messages in queue are sent or dropped
+ # depending upon the value of the socket option ZMQ::LINGER.
#
def close
- remove_finalizer
- result_code = LibZMQ.zmq_close @socket
- error_check ZMQ_CLOSE_STR, result_code
+ if @socket
+ remove_finalizer
+ result_code = LibZMQ.zmq_close @socket
+ error_check ZMQ_CLOSE_STR, result_code
+ @socket = nil
+ end
end
# Queues the message for transmission. Message is assumed to conform to the
# same public API as #Message.
#
@@ -324,25 +353,41 @@
result_code = LibZMQ.zmq_recv @socket, message.address, flags
flags != NOBLOCK ? error_check(ZMQ_RECV_STR, result_code) : error_check_nonblock(result_code)
end
+ # Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers
+ # for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often.
+ # Cannot save the buffer for the IDENTITY.
def alloc_temp_sockopt_buffers option_name
- length = FFI::MemoryPointer.new :int64
-
case option_name
- when RCVMORE, MCAST_LOOP, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF
+ when RCVMORE, MCAST_LOOP, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
# int64_t
- length.write_long_long 8
- [FFI::MemoryPointer.new(:int64), length]
+ unless @sockopt_cache[:int64]
+ length = FFI::MemoryPointer.new :int64
+ length.write_long_long 8
+ @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length]
+ end
+ @sockopt_cache[:int64]
+
+ when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
+ # int, 0mq assumes int is 4-bytes
+ unless @sockopt_cache[:int32]
+ length = FFI::MemoryPointer.new :int32
+ length.write_int 4
+ @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length]
+ end
+ @sockopt_cache[:int32]
+
when IDENTITY
+ length = FFI::MemoryPointer.new :int64
# could be a string of up to 255 bytes
length.write_long_long 255
[FFI::MemoryPointer.new(255), length]
end
end
-
+
def sanitize_value option_name, option_value
case option_name
when HWM, AFFINITY, SNDBUF, RCVBUF
option_value.abs
when MCAST_LOOP
@@ -350,11 +395,20 @@
else
option_value
end
end
- def define_finalizer
- ObjectSpace.define_finalizer(self, self.class.close(@socket))
+ # require a minimum of 0mq 2.1.0 to support socket finalizers; it contains important
+ # fixes for sockets and threads so that a garbage collector thread can successfully
+ # reap this resource without crashing
+ if Util.minimum_api?([2, 1, 0])
+ def define_finalizer
+ ObjectSpace.define_finalizer(self, self.class.close(@socket))
+ end
+ else
+ def define_finalizer
+ # no op
+ end
end
def remove_finalizer
ObjectSpace.undefine_finalizer self
end