lib/ffi-rzmq/socket.rb in ffi-rzmq-0.6.0 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.7.0

- old
+ new

@@ -25,11 +25,11 @@ # memory management. For automatic garbage collection of received messages, # it is possible to override the :receiver_class to use ZMQ::ManagedMessage. # # sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage) # - # Advanced users may want to replace the receiver class with their + # Advanced users may want to replace the receiver class with their # own custom class. The custom class must conform to the same public API # as ZMQ::Message. # # Can raise two kinds of exceptions depending on the error. # ContextError:: Raised when a socket operation is attempted on a terminated @@ -41,17 +41,23 @@ # same public API as ZMQ::Message @receiver_klass = opts[:receiver_class] unless context_ptr.null? @socket = LibZMQ.zmq_socket context_ptr, type - error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0 - @name = SocketTypeNameMap[type] + if @socket + error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0 + @name = SocketTypeNameMap[type] + else + raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Socket pointer was null" + end else raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Context pointer was null" end - #define_finalizer + @sockopt_cache = {} + + define_finalizer end # Set the queue options on this socket. # # Valid +option_name+ values that take a numeric +option_value+ are: @@ -59,10 +65,14 @@ # ZMQ::SWAP # ZMQ::AFFINITY # ZMQ::RATE # ZMQ::RECOVERY_IVL # ZMQ::MCAST_LOOP + # ZMQ::LINGER + # ZMQ::RECONNECT_IVL + # ZMQ::BACKLOG + # ZMQ::RECOVER_IVL_MSEC # # Valid +option_name+ values that take a string +option_value+ are: # ZMQ::IDENTITY # ZMQ::SUBSCRIBE # ZMQ::UNSUBSCRIBE @@ -73,17 +83,22 @@ # SocketError:: See all of the possibilities in the docs for #SocketError. # def setsockopt option_name, option_value, option_len = nil option_value = sanitize_value option_name, option_value option_len ||= option_value.size - + begin case option_name - when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF + when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC option_value_ptr = LibC.malloc option_len option_value_ptr.write_long option_value + when LINGER, RECONNECT_IVL, BACKLOG + option_len = 4 # hard-code "int" length to 4 bytes + option_value_ptr = LibC.malloc option_len + option_value_ptr.write_int option_value + when IDENTITY, SUBSCRIBE, UNSUBSCRIBE # note: not checking errno for failed memory allocations :( option_value_ptr = LibC.malloc option_len option_value_ptr.write_string option_value @@ -112,10 +127,16 @@ # ZMQ::RATE - integer # ZMQ::RECOVERY_IVL - integer # ZMQ::MCAST_LOOP - boolean # ZMQ::SNDBUF - integer # ZMQ::RCVBUF - integer + # ZMQ::FD - fd in an integer + # ZMQ::EVENTS - bitmap integer + # ZMQ::LINGER - integer measured in milliseconds + # ZMQ::RECONNECT_IVL - integer measured in milliseconds + # ZMQ::BACKLOG - integer + # ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds # # Can raise two kinds of exceptions depending on the error. # ContextError:: Raised when a socket operation is attempted on a terminated # #Context. See #ContextError. # SocketError:: See all of the possibilities in the docs for #SocketError. @@ -123,12 +144,14 @@ def getsockopt option_name begin option_value = FFI::MemoryPointer.new :pointer option_length = FFI::MemoryPointer.new :size_t - unless [RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, - IDENTITY, SNDBUF, RCVBUF].include? option_name + unless [ + RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY, + SNDBUF, RCVBUF, FD, EVENTS, LINGER, RECONNECT_IVL, BACKLOG, RECOVERY_IVL_MSEC + ].include? option_name # we didn't understand the passed option argument # will force a raise error_check ZMQ_SETSOCKOPT_STR, -1 end @@ -140,12 +163,14 @@ case option_name when RCVMORE, MCAST_LOOP # boolean return ret = option_value.read_long_long != 0 - when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF + when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC ret = option_value.read_long_long + when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS + ret = option_value.read_int when IDENTITY ret = option_value.read_string(option_length.read_long_long) end ret @@ -190,16 +215,20 @@ def connect address result_code = LibZMQ.zmq_connect @socket, address error_check ZMQ_CONNECT_STR, result_code end - # Closes the socket. Any unprocessed messages in queue are dropped. + # Closes the socket. Any unprocessed messages in queue are sent or dropped + # depending upon the value of the socket option ZMQ::LINGER. # def close - remove_finalizer - result_code = LibZMQ.zmq_close @socket - error_check ZMQ_CLOSE_STR, result_code + if @socket + remove_finalizer + result_code = LibZMQ.zmq_close @socket + error_check ZMQ_CLOSE_STR, result_code + @socket = nil + end end # Queues the message for transmission. Message is assumed to conform to the # same public API as #Message. # @@ -324,25 +353,41 @@ result_code = LibZMQ.zmq_recv @socket, message.address, flags flags != NOBLOCK ? error_check(ZMQ_RECV_STR, result_code) : error_check_nonblock(result_code) end + # Calls to ZMQ.getsockopt require us to pass in some pointers. We can cache and save those buffers + # for subsequent calls. This is a big perf win for calling RCVMORE which happens quite often. + # Cannot save the buffer for the IDENTITY. def alloc_temp_sockopt_buffers option_name - length = FFI::MemoryPointer.new :int64 - case option_name - when RCVMORE, MCAST_LOOP, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF + when RCVMORE, MCAST_LOOP, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC # int64_t - length.write_long_long 8 - [FFI::MemoryPointer.new(:int64), length] + unless @sockopt_cache[:int64] + length = FFI::MemoryPointer.new :int64 + length.write_long_long 8 + @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length] + end + @sockopt_cache[:int64] + + when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS + # int, 0mq assumes int is 4-bytes + unless @sockopt_cache[:int32] + length = FFI::MemoryPointer.new :int32 + length.write_int 4 + @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length] + end + @sockopt_cache[:int32] + when IDENTITY + length = FFI::MemoryPointer.new :int64 # could be a string of up to 255 bytes length.write_long_long 255 [FFI::MemoryPointer.new(255), length] end end - + def sanitize_value option_name, option_value case option_name when HWM, AFFINITY, SNDBUF, RCVBUF option_value.abs when MCAST_LOOP @@ -350,11 +395,20 @@ else option_value end end - def define_finalizer - ObjectSpace.define_finalizer(self, self.class.close(@socket)) + # require a minimum of 0mq 2.1.0 to support socket finalizers; it contains important + # fixes for sockets and threads so that a garbage collector thread can successfully + # reap this resource without crashing + if Util.minimum_api?([2, 1, 0]) + def define_finalizer + ObjectSpace.define_finalizer(self, self.class.close(@socket)) + end + else + def define_finalizer + # no op + end end def remove_finalizer ObjectSpace.undefine_finalizer self end