lib/ffi-rzmq/socket.rb in ffi-rzmq-0.8.0 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.8.2
- old
+ new
@@ -82,24 +82,26 @@
# #Context. See #ContextError.
# SocketError:: See all of the possibilities in the docs for #SocketError.
#
def setsockopt option_name, option_value, option_len = nil
option_value = sanitize_value option_name, option_value
- option_len ||= option_value.size
begin
case option_name
when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
+ option_len = 8 # all of these options are defined as int64_t or uint64_t
option_value_ptr = LibC.malloc option_len
- option_value_ptr.write_long option_value
+ option_value_ptr.write_long_long option_value
when LINGER, RECONNECT_IVL, BACKLOG
option_len = 4 # hard-code "int" length to 4 bytes
option_value_ptr = LibC.malloc option_len
option_value_ptr.write_int option_value
when IDENTITY, SUBSCRIBE, UNSUBSCRIBE
+ option_len ||= option_value.size
+
# note: not checking errno for failed memory allocations :(
option_value_ptr = LibC.malloc option_len
option_value_ptr.write_string option_value
else
@@ -145,11 +147,11 @@
begin
option_value = FFI::MemoryPointer.new :pointer
option_length = FFI::MemoryPointer.new(:size_t) rescue FFI::MemoryPointer.new(:ulong)
unless [
- RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY,
+ TYPE, RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY,
SNDBUF, RCVBUF, FD, EVENTS, LINGER, RECONNECT_IVL, BACKLOG, RECOVERY_IVL_MSEC
].include? option_name
# we didn't understand the passed option argument
# will force a raise
error_check ZMQ_SETSOCKOPT_STR, -1
@@ -165,11 +167,11 @@
when RCVMORE, MCAST_LOOP
# boolean return
ret = option_value.read_long_long != 0
when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC
ret = option_value.read_long_long
- when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
+ when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
ret = option_value.read_int
when IDENTITY
ret = option_value.read_string(option_length.read_long_long)
end
@@ -255,11 +257,11 @@
begin
result_code = LibZMQ.zmq_send @socket, message.address, flags
# when the flag isn't set, do a normal error check
# when set, check to see if the message was successfully queued
- queued = flags != NOBLOCK ? error_check(ZMQ_SEND_STR, result_code) : error_check_nonblock(result_code)
+ queued = noblock?(flags) ? error_check_nonblock(result_code) : error_check(ZMQ_SEND_STR, result_code)
end
# true if sent, false if failed/EAGAIN
queued
end
@@ -278,11 +280,29 @@
message = Message.new message_string
result_code = send_and_close message, flags
result_code
end
-
+
+ # Send a sequence of strings as a multipart message out of the +parts+
+ # passed in for transmission. Every element of +parts+ should be
+ # a String.
+ #
+ # +flags+ may be ZMQ::NOBLOCK.
+ #
+ # Raises the same exceptions as Socket#send.
+ #
+ def send_strings parts, flags = 0
+ return false if !parts || parts.empty?
+
+ parts[0...-1].each do |part|
+ return false unless send_string part, flags | ZMQ::SNDMORE
+ end
+
+ send_string parts[-1], flags
+ end
+
# Sends a message. This will automatically close the +message+ for both successful
# and failed sends.
#
# Raises the same exceptions as Socket#send
#
@@ -353,10 +373,23 @@
ensure
message.close
end
end
+ # Receive a multipart message as a list of strings.
+ #
+ # +flags+ may be ZMQ::NOBLOCK.
+ #
+ # Raises the same exceptions as Socket#recv.
+ #
+ def recv_strings flags = 0
+ parts = []
+ parts << recv_string(flags)
+ parts << recv_string(flags) while more_parts?
+ parts
+ end
+
private
def noblock? flag
(NOBLOCK & flag) == NOBLOCK
end
@@ -383,10 +416,10 @@
length.write_long_long 8
@sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length]
end
@sockopt_cache[:int64]
- when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
+ when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS
# int, 0mq assumes int is 4-bytes
unless @sockopt_cache[:int32]
length = FFI::MemoryPointer.new :int32
length.write_int 4
@sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length]