lib/0mq/socket.rb in 0mq-0.1.0 vs lib/0mq/socket.rb in 0mq-0.1.1
- old
+ new
@@ -1,15 +1,17 @@
+require_relative 'socket/options'
+
module ZMQ
class Socket
attr_reader :ptr
attr_reader :context
attr_reader :type
- def initialize(type, context:ZMQ::DefaultContext)
- @context = context
+ def initialize(type, opts={})
+ @context = opts.fetch :context, ZMQ::DefaultContext
@type = type
@ptr = LibZMQ.zmq_socket @context.ptr, @type
@msgptr = FFI::MemoryPointer.new LibZMQ::Message.size, 1, false
@@ -113,10 +115,39 @@
break unless get_opt(ZMQ::RCVMORE)
end
end
end
+ # Send a multipart message as routing array and a body array
+ # All parts before an empty part are considered routing parts,
+ # and all parta after the empty part are considered body parts.
+ # The empty delimiter part should not be included in the input arrays.
+ def send_with_routing(routing, body)
+ send_array [*routing, '', *body]
+ end
+
+ # Receive a multipart message as routing array and a body array
+ # All parts before an empty part are considered routing parts,
+ # and all parta after the empty part are considered body parts.
+ # The empty delimiter part is not included in the resulting arrays.
+ def recv_with_routing
+ [[],[]].tap do |routing, body|
+ loop do
+ nxt = recv_string
+ break if nxt.empty?
+ routing << nxt
+ raise ArgumentError, "Expected empty routing delimiter in "\
+ "multipart message: #{routing}" \
+ unless get_opt ZMQ::RCVMORE
+ end
+ loop do
+ body << recv_string
+ break unless get_opt(ZMQ::RCVMORE)
+ end
+ end
+ end
+
# Set a socket option
def set_opt(option, value)
type = @@option_types.fetch(option) \
{ raise ArgumentError, "Unknown option: #{option}" }
@@ -168,92 +199,9 @@
@temp_buffers[type] ||= [
FFI::MemoryPointer.new(type),
FFI::MemoryPointer.new(:size_t).write_int(FFI.type_size(type))
]
end
-
- @@option_types = {
- # Get options
- ZMQ::RCVMORE => :bool,
- ZMQ::RCVHWM => :int,
- ZMQ::AFFINITY => :uint64,
- ZMQ::IDENTITY => :string,
- ZMQ::RATE => :int,
- ZMQ::RECOVERY_IVL => :int,
- ZMQ::SNDBUF => :int,
- ZMQ::RCVBUF => :int,
- ZMQ::LINGER => :int,
- ZMQ::RECONNECT_IVL => :int,
- ZMQ::RECONNECT_IVL_MAX => :int,
- ZMQ::BACKLOG => :int,
- ZMQ::MAXMSGSIZE => :int64,
- ZMQ::MULTICAST_HOPS => :int,
- ZMQ::RCVTIMEO => :int,
- ZMQ::SNDTIMEO => :int,
- ZMQ::IPV6 => :bool,
- ZMQ::IPV4ONLY => :bool,
- ZMQ::IMMEDIATE => :bool,
- ZMQ::FD => :int,
- ZMQ::EVENTS => :int,
- ZMQ::LAST_ENDPOINT => :string,
- ZMQ::TCP_KEEPALIVE => :int,
- ZMQ::TCP_KEEPALIVE_IDLE => :int,
- ZMQ::TCP_KEEPALIVE_CNT => :int,
- ZMQ::TCP_KEEPALIVE_INTVL => :int,
- ZMQ::MECHANISM => :int,
- ZMQ::PLAIN_SERVER => :int,
- ZMQ::PLAIN_USERNAME => :string,
- ZMQ::PLAIN_PASSWORD => :string,
- ZMQ::CURVE_PUBLICKEY => :string,
- ZMQ::CURVE_SECRETKEY => :string,
- ZMQ::CURVE_SERVERKEY => :string,
- ZMQ::ZAP_DOMAIN => :string,
- }.merge({
- # Set options
- ZMQ::SNDHWM => :int,
- ZMQ::RCVHWM => :int,
- ZMQ::AFFINITY => :uint64,
- ZMQ::SUBSCRIBE => :string,
- ZMQ::UNSUBSCRIBE => :string,
- ZMQ::IDENTITY => :string,
- ZMQ::RATE => :int,
- ZMQ::RECOVERY_IVL => :int,
- ZMQ::SNDBUF => :int,
- ZMQ::RCVBUF => :int,
- ZMQ::LINGER => :int,
- ZMQ::RECONNECT_IVL => :int,
- ZMQ::RECONNECT_IVL_MAX => :int,
- ZMQ::RECONNECT_IVL => :int,
- ZMQ::BACKLOG => :int,
- ZMQ::MAXMSGSIZE => :int64,
- ZMQ::MULTICAST_HOPS => :int,
- ZMQ::RCVTIMEO => :int,
- ZMQ::SNDTIMEO => :int,
- ZMQ::IPV6 => :bool,
- ZMQ::IPV4ONLY => :bool,
- ZMQ::IMMEDIATE => :bool,
- ZMQ::ROUTER_HANDOVER => :int,
- ZMQ::ROUTER_MANDATORY => :int,
- ZMQ::ROUTER_RAW => :int,
- ZMQ::PROBE_ROUTER => :int,
- ZMQ::XPUB_VERBOSE => :int,
- ZMQ::REQ_CORRELATE => :int,
- ZMQ::REQ_RELAXED => :int,
- ZMQ::TCP_KEEPALIVE => :int,
- ZMQ::TCP_KEEPALIVE_IDLE => :int,
- ZMQ::TCP_KEEPALIVE_CNT => :int,
- ZMQ::TCP_KEEPALIVE_INTVL => :int,
- ZMQ::TCP_ACCEPT_FILTER => :string,
- ZMQ::PLAIN_SERVER => :int,
- ZMQ::PLAIN_USERNAME => :string,
- ZMQ::PLAIN_PASSWORD => :string,
- ZMQ::CURVE_SERVER => :int,
- ZMQ::CURVE_PUBLICKEY => :string,
- ZMQ::CURVE_SECRETKEY => :string,
- ZMQ::CURVE_SERVERKEY => :string,
- ZMQ::ZAP_DOMAIN => :string,
- ZMQ::CONFLATE => :bool,
- })
end
end