lib/ffi-rzmq/socket.rb in ffi-rzmq-0.8.0 vs lib/ffi-rzmq/socket.rb in ffi-rzmq-0.8.2

- old
+ new

@@ -82,24 +82,26 @@ # #Context. See #ContextError. # SocketError:: See all of the possibilities in the docs for #SocketError. # def setsockopt option_name, option_value, option_len = nil option_value = sanitize_value option_name, option_value - option_len ||= option_value.size begin case option_name when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC + option_len = 8 # all of these options are defined as int64_t or uint64_t option_value_ptr = LibC.malloc option_len - option_value_ptr.write_long option_value + option_value_ptr.write_long_long option_value when LINGER, RECONNECT_IVL, BACKLOG option_len = 4 # hard-code "int" length to 4 bytes option_value_ptr = LibC.malloc option_len option_value_ptr.write_int option_value when IDENTITY, SUBSCRIBE, UNSUBSCRIBE + option_len ||= option_value.size + # note: not checking errno for failed memory allocations :( option_value_ptr = LibC.malloc option_len option_value_ptr.write_string option_value else @@ -145,11 +147,11 @@ begin option_value = FFI::MemoryPointer.new :pointer option_length = FFI::MemoryPointer.new(:size_t) rescue FFI::MemoryPointer.new(:ulong) unless [ - RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY, + TYPE, RCVMORE, HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, MCAST_LOOP, IDENTITY, SNDBUF, RCVBUF, FD, EVENTS, LINGER, RECONNECT_IVL, BACKLOG, RECOVERY_IVL_MSEC ].include? option_name # we didn't understand the passed option argument # will force a raise error_check ZMQ_SETSOCKOPT_STR, -1 @@ -165,11 +167,11 @@ when RCVMORE, MCAST_LOOP # boolean return ret = option_value.read_long_long != 0 when HWM, SWAP, AFFINITY, RATE, RECOVERY_IVL, SNDBUF, RCVBUF, RECOVERY_IVL_MSEC ret = option_value.read_long_long - when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS + when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS ret = option_value.read_int when IDENTITY ret = option_value.read_string(option_length.read_long_long) end @@ -255,11 +257,11 @@ begin result_code = LibZMQ.zmq_send @socket, message.address, flags # when the flag isn't set, do a normal error check # when set, check to see if the message was successfully queued - queued = flags != NOBLOCK ? error_check(ZMQ_SEND_STR, result_code) : error_check_nonblock(result_code) + queued = noblock?(flags) ? error_check_nonblock(result_code) : error_check(ZMQ_SEND_STR, result_code) end # true if sent, false if failed/EAGAIN queued end @@ -278,11 +280,29 @@ message = Message.new message_string result_code = send_and_close message, flags result_code end - + + # Send a sequence of strings as a multipart message out of the +parts+ + # passed in for transmission. Every element of +parts+ should be + # a String. + # + # +flags+ may be ZMQ::NOBLOCK. + # + # Raises the same exceptions as Socket#send. + # + def send_strings parts, flags = 0 + return false if !parts || parts.empty? + + parts[0...-1].each do |part| + return false unless send_string part, flags | ZMQ::SNDMORE + end + + send_string parts[-1], flags + end + # Sends a message. This will automatically close the +message+ for both successful # and failed sends. # # Raises the same exceptions as Socket#send # @@ -353,10 +373,23 @@ ensure message.close end end + # Receive a multipart message as a list of strings. + # + # +flags+ may be ZMQ::NOBLOCK. + # + # Raises the same exceptions as Socket#recv. + # + def recv_strings flags = 0 + parts = [] + parts << recv_string(flags) + parts << recv_string(flags) while more_parts? + parts + end + private def noblock? flag (NOBLOCK & flag) == NOBLOCK end @@ -383,10 +416,10 @@ length.write_long_long 8 @sockopt_cache[:int64] = [FFI::MemoryPointer.new(:int64), length] end @sockopt_cache[:int64] - when LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS + when TYPE, LINGER, RECONNECT_IVL, BACKLOG, FD, EVENTS # int, 0mq assumes int is 4-bytes unless @sockopt_cache[:int32] length = FFI::MemoryPointer.new :int32 length.write_int 4 @sockopt_cache[:int32] = [FFI::MemoryPointer.new(:int32), length]