lib/0mq/socket.rb in 0mq-0.1.0 vs lib/0mq/socket.rb in 0mq-0.1.1

- old
+ new

@@ -1,15 +1,17 @@ +require_relative 'socket/options' + module ZMQ class Socket attr_reader :ptr attr_reader :context attr_reader :type - def initialize(type, context:ZMQ::DefaultContext) - @context = context + def initialize(type, opts={}) + @context = opts.fetch :context, ZMQ::DefaultContext @type = type @ptr = LibZMQ.zmq_socket @context.ptr, @type @msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false @@ -113,10 +115,39 @@ break unless get_opt(ZMQ::RCVMORE) end end end + # Send a multipart message as routing array and a body array + # All parts before an empty part are considered routing parts, + # and all parta after the empty part are considered body parts. + # The empty delimiter part should not be included in the input arrays. + def send_with_routing(routing, body) + send_array [*routing, '', *body] + end + + # Receive a multipart message as routing array and a body array + # All parts before an empty part are considered routing parts, + # and all parta after the empty part are considered body parts. + # The empty delimiter part is not included in the resulting arrays. + def recv_with_routing + [[],[]].tap do |routing, body| + loop do + nxt = recv_string + break if nxt.empty? + routing << nxt + raise ArgumentError, "Expected empty routing delimiter in "\ + "multipart message: #{routing}" \ + unless get_opt ZMQ::RCVMORE + end + loop do + body << recv_string + break unless get_opt(ZMQ::RCVMORE) + end + end + end + # Set a socket option def set_opt(option, value) type = @@option_types.fetch(option) \ { raise ArgumentError, "Unknown option: #{option}" } @@ -168,92 +199,9 @@ @temp_buffers[type] ||= [ FFI::MemoryPointer.new(type), FFI::MemoryPointer.new(:size_t).write_int(FFI.type_size(type)) ] end - - @@option_types = { - # Get options - ZMQ::RCVMORE => :bool, - ZMQ::RCVHWM => :int, - ZMQ::AFFINITY => :uint64, - ZMQ::IDENTITY => :string, - ZMQ::RATE => :int, - ZMQ::RECOVERY_IVL => :int, - ZMQ::SNDBUF => :int, - ZMQ::RCVBUF => :int, - ZMQ::LINGER => :int, - ZMQ::RECONNECT_IVL => :int, - ZMQ::RECONNECT_IVL_MAX => :int, - ZMQ::BACKLOG => :int, - ZMQ::MAXMSGSIZE => :int64, - ZMQ::MULTICAST_HOPS => :int, - ZMQ::RCVTIMEO => :int, - ZMQ::SNDTIMEO => :int, - ZMQ::IPV6 => :bool, - ZMQ::IPV4ONLY => :bool, - ZMQ::IMMEDIATE => :bool, - ZMQ::FD => :int, - ZMQ::EVENTS => :int, - ZMQ::LAST_ENDPOINT => :string, - ZMQ::TCP_KEEPALIVE => :int, - ZMQ::TCP_KEEPALIVE_IDLE => :int, - ZMQ::TCP_KEEPALIVE_CNT => :int, - ZMQ::TCP_KEEPALIVE_INTVL => :int, - ZMQ::MECHANISM => :int, - ZMQ::PLAIN_SERVER => :int, - ZMQ::PLAIN_USERNAME => :string, - ZMQ::PLAIN_PASSWORD => :string, - ZMQ::CURVE_PUBLICKEY => :string, - ZMQ::CURVE_SECRETKEY => :string, - ZMQ::CURVE_SERVERKEY => :string, - ZMQ::ZAP_DOMAIN => :string, - }.merge({ - # Set options - ZMQ::SNDHWM => :int, - ZMQ::RCVHWM => :int, - ZMQ::AFFINITY => :uint64, - ZMQ::SUBSCRIBE => :string, - ZMQ::UNSUBSCRIBE => :string, - ZMQ::IDENTITY => :string, - ZMQ::RATE => :int, - ZMQ::RECOVERY_IVL => :int, - ZMQ::SNDBUF => :int, - ZMQ::RCVBUF => :int, - ZMQ::LINGER => :int, - ZMQ::RECONNECT_IVL => :int, - ZMQ::RECONNECT_IVL_MAX => :int, - ZMQ::RECONNECT_IVL => :int, - ZMQ::BACKLOG => :int, - ZMQ::MAXMSGSIZE => :int64, - ZMQ::MULTICAST_HOPS => :int, - ZMQ::RCVTIMEO => :int, - ZMQ::SNDTIMEO => :int, - ZMQ::IPV6 => :bool, - ZMQ::IPV4ONLY => :bool, - ZMQ::IMMEDIATE => :bool, - ZMQ::ROUTER_HANDOVER => :int, - ZMQ::ROUTER_MANDATORY => :int, - ZMQ::ROUTER_RAW => :int, - ZMQ::PROBE_ROUTER => :int, - ZMQ::XPUB_VERBOSE => :int, - ZMQ::REQ_CORRELATE => :int, - ZMQ::REQ_RELAXED => :int, - ZMQ::TCP_KEEPALIVE => :int, - ZMQ::TCP_KEEPALIVE_IDLE => :int, - ZMQ::TCP_KEEPALIVE_CNT => :int, - ZMQ::TCP_KEEPALIVE_INTVL => :int, - ZMQ::TCP_ACCEPT_FILTER => :string, - ZMQ::PLAIN_SERVER => :int, - ZMQ::PLAIN_USERNAME => :string, - ZMQ::PLAIN_PASSWORD => :string, - ZMQ::CURVE_SERVER => :int, - ZMQ::CURVE_PUBLICKEY => :string, - ZMQ::CURVE_SECRETKEY => :string, - ZMQ::CURVE_SERVERKEY => :string, - ZMQ::ZAP_DOMAIN => :string, - ZMQ::CONFLATE => :bool, - }) end end