lib/ffi-rzmq/socket.rb in ffi-rzmq-0.5.1 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.6.0
- old
+ new
@@ -11,33 +11,42 @@
ZMQ_RECV_STR = 'zmq_recv'.freeze
class Socket
include ZMQ::Util
- attr_reader :socket
+ attr_reader :socket, :name
- # By default, this class uses ZMQ::Message for regular Ruby
- # memory management.
+ # Allocates a socket of type +type+ for sending and receiving data.
#
# +type+ can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB,
# ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH,
# ZMQ::XREQ or ZMQ::XREP.
#
+ # By default, this class uses ZMQ::Message for manual
+ # memory management. For automatic garbage collection of received messages,
+ # it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
+ #
+ # sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
+ #
+ # Advanced users may want to replace the receiver class with their
+ # own custom class. The custom class must conform to the same public API
+ # as ZMQ::Message.
+ #
# Can raise two kinds of exceptions depending on the error.
# ContextError:: Raised when a socket operation is attempted on a terminated
# #Context. See #ContextError.
# SocketError:: See all of the possibilities in the docs for #SocketError.
#
- def initialize context_ptr, type
- # maybe at some point we'll want to allow users to override this with their
- # own classes? Or is this a YAGNI mistake?
- @sender_klass = ZMQ::Message
- @receiver_klass = ZMQ::Message
+ def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
+ # users may override the classes used for receiving; class must conform to the
+ # same public API as ZMQ::Message
+ @receiver_klass = opts[:receiver_class]
unless context_ptr.null?
@socket = LibZMQ.zmq_socket context_ptr, type
error_check ZMQ_SOCKET_STR, @socket.null? ? 1 : 0
+ @name = SocketTypeNameMap[type]
else
raise ContextError.new ZMQ_SOCKET_STR, 0, ETERM, "Context pointer was null"
end
#define_finalizer
@@ -62,28 +71,31 @@
# ContextError:: Raised when a socket operation is attempted on a terminated
# #Context. See #ContextError.
# SocketError:: See all of the possibilities in the docs for #SocketError.
#
def setsockopt option_name, option_value, option_len = nil
+ option_value = sanitize_value option_name, option_value
+ option_len ||= option_value.size
+
begin
case option_name
- when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP
- option_value_ptr = LibC.malloc option_value.size
+ when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF
+ option_value_ptr = LibC.malloc option_len
option_value_ptr.write_long option_value
when IDENTITY, SUBSCRIBE, UNSUBSCRIBE
# note: not checking errno for failed memory allocations :(
- option_value_ptr = LibC.malloc option_value.size
+ option_value_ptr = LibC.malloc option_len
option_value_ptr.write_string option_value
else
# we didn't understand the passed option argument
# will force a raise due to EINVAL being non-zero
error_check ZMQ_SETSOCKOPT_STR, EINVAL
end
- result_code = LibZMQ.zmq_setsockopt @socket, option_name, option_value_ptr, option_len || option_value.size
+ result_code = LibZMQ.zmq_setsockopt @socket, option_name, option_value_ptr, option_len
error_check ZMQ_SETSOCKOPT_STR, result_code
ensure
LibC.free option_value_ptr unless option_value_ptr.nil? || option_value_ptr.null?
end
end
@@ -186,26 +198,27 @@
remove_finalizer
result_code = LibZMQ.zmq_close @socket
error_check ZMQ_CLOSE_STR, result_code
end
- # Queues the message for transmission. Message is assumed to be an instance or
- # subclass of #Message.
+ # Queues the message for transmission. Message is assumed to conform to the
+ # same public API as #Message.
#
# +flags+ may take two values:
# * 0 (default) - blocking operation
# * ZMQ::NOBLOCK - non-blocking operation
# * ZMQ::SNDMORE - this message is part of a multi-part message
#
# Returns true when the message was successfully enqueued.
- # Returns false when the message could not be enqueued *and* +flags+ is set
- # with ZMQ::NOBLOCK.
+ # Returns false under two conditions.
+ # 1. The message could not be enqueued
+ # 2. When +flags+ is set with ZMQ::NOBLOCK and the socket returned EAGAIN.
#
# The application code is *not* responsible for handling the +message+ object
- # lifecycle when #send return ZMQ::NOBLOCK or it raises an exception. The
+ # lifecycle when #send returns successfully or it raises an exception. The
# #send method takes ownership of the +message+ and its associated buffers.
- # A failed call will release the +message+ data buffer.
+ # Both successful and failed calls will release the +message+ data buffer.
#
# Again, once a +message+ object has been passed to this method,
# do not try to access its #data buffer anymore. The 0mq library now owns it.
#
# Can raise two kinds of exceptions depending on the error.
@@ -237,11 +250,11 @@
# ContextError:: Raised when a socket operation is attempted on a terminated
# #Context. See #ContextError.
# SocketError:: See all of the possibilities in the docs for #SocketError.
#
def send_string message_string, flags = 0
- message = @sender_klass.new
+ message = Message.new
message.copy_in_string message_string
result = send message, flags
result
end
@@ -323,9 +336,20 @@
[FFI::MemoryPointer.new(:int64), length]
when IDENTITY
# could be a string of up to 255 bytes
length.write_long_long 255
[FFI::MemoryPointer.new(255), length]
+ end
+ end
+
+ def sanitize_value option_name, option_value
+ case option_name
+ when HWM, AFFINITY, SNDBUF, RCVBUF
+ option_value.abs
+ when MCAST_LOOP
+ option_value ? 1 : 0
+ else
+ option_value
end
end
def define_finalizer
ObjectSpace.define_finalizer(self, self.class.close(@socket))