lib/ffi-rzmq/socket.rb in ffi-rzmq-0.5.1 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.6.0

- old
+ new

@@ -11,33 +11,42 @@ ZMQ_RECV_STR = 'zmq_recv'.freeze class Socket include ZMQ::Util - attr_reader :socket + attr_reader :socket, :name - # By default, this class uses ZMQ::Message for regular Ruby - # memory management. + # Allocates a socket of type +type+ for sending and receiving data. # # +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, # ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, # ZMQ::XREQ or ZMQ::XREP. # + # By default, this class uses ZMQ::Message for manual + # memory management. For automatic garbage collection of received messages, + # it is possible to override the :receiver_class to use ZMQ::ManagedMessage. + # + # sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage) + # + # Advanced users may want to replace the receiver class with their + # own custom class. The custom class must conform to the same public API + # as ZMQ::Message. + # # Can raise two kinds of exceptions depending on the error. # ContextError:: Raised when a socket operation is attempted on a terminated # #Context. See #ContextError. # SocketError:: See all of the possibilities in the docs for #SocketError. # - def initialize context_ptr, type - # maybe at some point we'll want to allow users to override this with their - # own classes? Or is this a YAGNI mistake? - @sender_klass = ZMQ::Message - @receiver_klass = ZMQ::Message + def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message} + # users may override the classes used for receiving; class must conform to the + # same public API as ZMQ::Message + @receiver_klass = opts[:receiver_class] unless context_ptr.null? @socket = LibZMQ.zmq_socket context_ptr, type error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0 + @name = SocketTypeNameMap[type] else raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Context pointer was null" end #define_finalizer @@ -62,28 +71,31 @@ # ContextError:: Raised when a socket operation is attempted on a terminated # #Context. See #ContextError. # SocketError:: See all of the possibilities in the docs for #SocketError. # def setsockopt option_name, option_value, option_len = nil + option_value = sanitize_value option_name, option_value + option_len ||= option_value.size + begin case option_name - when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP - option_value_ptr = LibC.malloc option_value.size + when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF + option_value_ptr = LibC.malloc option_len option_value_ptr.write_long option_value when IDENTITY, SUBSCRIBE, UNSUBSCRIBE # note: not checking errno for failed memory allocations :( - option_value_ptr = LibC.malloc option_value.size + option_value_ptr = LibC.malloc option_len option_value_ptr.write_string option_value else # we didn't understand the passed option argument # will force a raise due to EINVAL being non-zero error_check ZMQ_SETSOCKOPT_STR, EINVAL end - result_code = LibZMQ.zmq_setsockopt @socket, option_name, option_value_ptr, option_len || option_value.size + result_code = LibZMQ.zmq_setsockopt @socket, option_name, option_value_ptr, option_len error_check ZMQ_SETSOCKOPT_STR, result_code ensure LibC.free option_value_ptr unless option_value_ptr.nil? || option_value_ptr.null? end end @@ -186,26 +198,27 @@ remove_finalizer result_code = LibZMQ.zmq_close @socket error_check ZMQ_CLOSE_STR, result_code end - # Queues the message for transmission. Message is assumed to be an instance or - # subclass of #Message. + # Queues the message for transmission. Message is assumed to conform to the + # same public API as #Message. # # +flags+ may take two values: # * 0 (default) - blocking operation # * ZMQ::NOBLOCK - non-blocking operation # * ZMQ::SNDMORE - this message is part of a multi-part message # # Returns true when the message was successfully enqueued. - # Returns false when the message could not be enqueued *and* +flags+ is set - # with ZMQ::NOBLOCK. + # Returns false under two conditions. + # 1. The message could not be enqueued + # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN. # # The application code is *not* responsible for handling the +message+ object - # lifecycle when #send return ZMQ::NOBLOCK or it raises an exception. The + # lifecycle when #send returns successfully or it raises an exception. The # #send method takes ownership of the +message+ and its associated buffers. - # A failed call will release the +message+ data buffer. + # Both successful and failed calls will release the +message+ data buffer. # # Again, once a +message+ object has been passed to this method, # do not try to access its #data buffer anymore. The 0mq library now owns it. # # Can raise two kinds of exceptions depending on the error. @@ -237,11 +250,11 @@ # ContextError:: Raised when a socket operation is attempted on a terminated # #Context. See #ContextError. # SocketError:: See all of the possibilities in the docs for #SocketError. # def send_string message_string, flags = 0 - message = @sender_klass.new + message = Message.new message.copy_in_string message_string result = send message, flags result end @@ -323,9 +336,20 @@ [FFI::MemoryPointer.new(:int64), length] when IDENTITY # could be a string of up to 255 bytes length.write_long_long 255 [FFI::MemoryPointer.new(255), length] + end + end + + def sanitize_value option_name, option_value + case option_name + when HWM, AFFINITY, SNDBUF, RCVBUF + option_value.abs + when MCAST_LOOP + option_value ? 1 : 0 + else + option_value end end def define_finalizer ObjectSpace.define_finalizer(self, self.class.close(@socket))