lib/msgpack/rpc.rb in msgpack-rpc-0.2.0 vs lib/msgpack/rpc.rb in msgpack-rpc-0.3.0

- old
+ new

@@ -1,9 +1,9 @@ # # MessagePack-RPC for Ruby # -# Copyright (C) 2009 FURUHASHI Sadayuki +# Copyright (C) 2010 FURUHASHI Sadayuki # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # @@ -15,10 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # require 'msgpack' require 'rev' +require 'zlib' module MessagePack module RPC @@ -49,26 +50,21 @@ def initialize(msg = "connect failed") super end end -class Responder - def initialize(socket, msgid) - @socket = socket - @msgid = msgid - end - def result(retval, err = nil) - @socket.write RPCSocket.pack_response(@msgid, retval, err) - end +REQUEST = 0 # [0, msgid, method, param] +RESPONSE = 1 # [1, msgid, error, result] +NOTIFY = 2 # [2, method, param] +SESSION = 3 # [3, addr] +OPTION = 4 # [4, txopt, rxopt] #[, rxproto, rxaddr] +OPT_DEFLATE = 0b00000001 +#PROTO_TCP = 0 +#PROTO_UDP = 1 - def error(err) - result(nil, err) - end -end - class Address # +--+----+ # | 2| 4 | # +--+----+ # port network byte order @@ -82,30 +78,34 @@ # test = Socket.pack_sockaddr_in(0,'0.0.0.0') if test[0] == "\0"[0] || test[1] == "\0"[0] # Linux - IMPL_LINUX = true + def initialize(host, port) + raw = Socket.pack_sockaddr_in(port, host) + family = raw.unpack('S')[0] + if family == Socket::AF_INET + @serial = raw[2,6] + elsif family == Socket::AF_INET6 + @serial = raw[2,2] + raw[8,16] + else + raise "Unknown address family: #{family}" + end + end else # BSD - IMPL_LINUX = false - end - - def initialize(host, port) - raw = Socket.pack_sockaddr_in(port, host) - if IMPL_LINUX - family = raw.unpack('S')[0] - else + def initialize(host, port) + raw = Socket.pack_sockaddr_in(port, host) family = raw.unpack('CC')[1] + if family == Socket::AF_INET + @serial = raw[2,6] + elsif family == Socket::AF_INET6 + @serial = raw[2,2] + raw[8,16] + else + raise "Unknown address family: #{family}" + end end - if family == Socket::AF_INET - @serial = raw[2,6] - elsif family == Socket::AF_INET6 - @serial = raw[2,2] + raw[8,16] - else - raise "Unknown address family: #{family}" - end end def host unpack[0] end @@ -164,11 +164,10 @@ def inspect "#<#{self.class} #{to_s} @serial=#{@serial.inspect}>" end - def eql?(o) o.class == Address && dump.eql?(o.dump) end def hash @@ -179,257 +178,285 @@ eql?(o) end end -REQUEST = 0 -RESPONSE = 1 -NOTIFY = 2 -INIT = 3 - - -class RPCSocket < Rev::TCPSocket - def initialize(sock, session) - @buffer = '' - @nread = 0 - @mpac = MessagePack::Unpacker.new - @s = session - super(sock) +class Future + def initialize(s, loop, block = nil) + @s = s + @timeout = s.timeout + @loop = loop + @block = block + @error = nil + @result = nil end + attr_reader :loop + attr_accessor :result, :error - def bind_session(s) - @s = s + def attach_callback(proc = nil, &block) + @block = proc || block end - def bound? - !@s.nil? + def call(err, res) + @error = err + @result = res + if @block + @block.call(err, res) + end + @s = nil end - def on_read(data) - @buffer << data - - while true - @nread = @mpac.execute(@buffer, @nread) - - if @mpac.finished? - msg = @mpac.data - - @mpac.reset - @buffer.slice!(0, @nread) - @nread = 0 - - on_message(msg) # RPCSocket#on_message - - next unless @buffer.empty? - end - - break + def join + while @s + @loop.run_once end + self end - def on_message(msg) - case msg[0] - when REQUEST - on_request(msg[1], msg[2], msg[3]) - when RESPONSE - on_response(msg[1], msg[2], msg[3]) - when NOTIFY - on_notify(msg[1], msg[2]) - when INIT - on_init(msg[1]) + def step_timeout + if @timeout < 1 + true else - raise RPCError.new("unknown message type #{msg[0]}") + @timeout -= 1 + false end end +end - def on_connect - return unless @s - @s.on_connect(self) - end - def on_connect_failed - return unless @s - @s.on_connect_failed(self) +class Responder + def initialize(tran, msgid) + @tran = tran # send_message method is required + @msgid = msgid end - def on_close - return unless @s - @s.on_close(self) - @s = nil - rescue - nil + def result(retval, err = nil) + @tran.send_message [RESPONSE, @msgid, err, retval] end - def on_request(msgid, method, param) - return unless @s - @s.on_request(method, param, Responder.new(self,msgid)) + def error(err, retval = nil) + result(retval, err) end +end - def on_notify(method, param) - return unless @s - @s.on_notify(method, param) + +class ExchangeOption + def initialize(flags = 0) + @flags = flags end - def on_response(msgid, error, result) - return unless @s - @s.on_response(msgid, error, result) + def get + @flags end - def on_init(msg) - # ignore + def deflate=(val) + val ? (@flags |= OPT_DEFLATE) : (@flags &= ~OPT_DEFLATE) end - def send_message(msg) - write msg.to_msgpack + def deflate? + @flags & OPT_DEFLATE != 0 end - def self.pack_request(msgid, method, param) - [REQUEST, msgid, method, param].to_msgpack + def reset + @flags = 0 + self end - def self.pack_response(msgid, result, error) - [RESPONSE, msgid, error, result].to_msgpack + def ==(o) + @flags == o.get end - def self.pack_notify(method, param) - [NOTIFY, method, param].to_msgpack + def to_msgpack(out = '') + @flags.to_msgpack(out) end - def self.pack_init(msg) - [INIT, msg].to_msgpack + def self.from_msgpack(obj) + @flags = obj end end +class StreamOption + def initialize(txopt = ExchangeOption.new, rxopt = ExchangeOption.new) #, rxproto = nil, rxaddr = nil) + @txopt = txopt + @rxopt = rxopt + #@rxproto = rxproto # nil or PROTO_TCP or PROTO_UDP + #@rxaddr = rxaddr # nil or address + end + attr_accessor :txopt, :rxopt #, :rxproto, :rxaddr -class Session + def default? + @txopt.get == 0 && @rxopt.get == 0 && @rxproto.nil? + end - class BasicRequest - def initialize(s, loop) - @s = s - @timeout = s.timeout - @loop = loop - end - attr_reader :loop + def reset + @txopt.reset + @rxopt.reset + #@rxproto = nil + #@rxaddr = nil + self + end - def call(err, res) - @s = nil - end + def ==(o) + @txopt == o.txopt && @rxopt == o.rxopt # && + #@rxproto == o.rxproto && @rxaddr == o.rxaddr + end - def join - while @s - @loop.run_once - end - self - end + def to_msgpack(out = '') + array = [OPTION, @txopt, @rxopt] + #if @rxproto + # array << @rxproto + # if @rxaddr + # array << @rxaddr + # end + #end + array.to_msgpack(out) + end - def step_timeout - if @timeout < 1 - true + def self.from_msgpack(obj) + txopt = ExchangeOption.new(obj[1] || 0) + rxopt = ExchangeOption.new(obj[2] || 0) + #if obj[3] + # rxproto = obj[3] + # if obj[4] + # rxaddr = Address.load(obj[4]) + # end + #end + StreamOption.new(txopt, rxopt) #, rxproto, rxaddr) + end +end + +class TransportOption < StreamOption + def initialize(*args) + super() + #@proto = PROTO_TCP + #@address = nil + args.each do |x| + case x + #when :tx_tcp, :tcp + # @proto = PROTO_TCP + #when :tx_udp, :udp + # @proto = PROTO_UDP + #when :rx_tcp + # @rxproto = PROTO_TCP + #when :rx_udp + # @rxproto = PROTO_UDP + when :deflate + @txopt.deflate = true + @rxopt.deflate = true + when :tx_deflate + @txopt.deflate = true + when :rx_deflate + @rxopt.deflate = true + #when PROTO_TCP + # @proto = PROTO_TCP + #when PROTO_UDP + # @proto = PROTO_UDP + #when Address + # @rxaddr = x else - @timeout -= 1 - false + raise "unknown option #{x.inspect}" end end end + #attr_accessor :proto, :address - class AsyncRequest < BasicRequest - def initialize(s, loop) - super(s, loop) - @error = nil - @result = nil - end - attr_accessor :result, :error + def reset + #@proto = PROTO_TCP + @address = nil + super + end - def call(err, res) - @error = err - @result = res - @s = nil - end + def ==(o) + super(o) # && @proto = o.proto && @address = o.address end - class CallbackRequest < BasicRequest - def initialize(s, loop, block) - super(s, loop) - @block = block - end + #def deflate(val = true) txopt.deflate = va; rxopt.deflate = va; self end + #def rx_deflate(val = true) @rxopt.deflate = val; self; end + #def tx_deflate(val = true) @txopt.deflate = val; self; end - def call(err, res) - @block.call(err, res) - end - end + #def tcp; @proto = PROTO_TCP; self; end + #def tx_tcp; @proto = PROTO_TCP; self; end + #def rx_tcp; @rxproto = PROTO_TCP; self; end + #def udp; @proto = PROTO_UDP; self; end + #def rx_udp; @proto = PROTO_UDP; self; end + #def rx_udp; @rxproto = PROTO_UDP; self; end +end - def initialize(initmsg, target_addr, dispatcher, loop) - @initmsg = initmsg - @target_addr = target_addr + +class Session + def initialize(to_addr, dtopt, dispatcher, self_addr, loop) + @address = to_addr # destination address + @self_address = self_addr # self session identifier @dispatcher = dispatcher || NullDispatcher.new + @dtopt = dtopt # default transport option + @dtran = create_transport(@dtopt) # default transport @loop = loop + @trans = [] + @reqtable = {} @timeout = 10 # FIXME default timeout time - @reconnect = 5 # FIXME default reconnect limit reset end - attr_accessor :timeout, :reconnect + attr_reader :address, :self_address, :loop + attr_accessor :timeout - def address - @target_addr + def send(method, *args) + send_over(nil, method, *args) end - def on_connect(sock) - @sockpool.push(sock) - sock.write @pending - @pending = "" - @connecting = 0 + def callback(method, *args, &block) + callback_over(nil, method, *args, &block) end - def on_connect_failed(sock) - if @connecting < @reconnect - try_connect - @connecting += 1 - else - @connecting = 0 - @reqtable.reject! {|msgid, req| - begin - req.call ConnectError.new, nil - rescue - end - true - } - end + def call(method, *args) + call_over(nil, method, *args) end + def notify(method, *args) + notify_over(nil, method, *args) + end - def send(method, *args) - msgid = send_request(method, args) - @reqtable[msgid] = AsyncRequest.new(self, @loop) + def over(*args) + Over.new(self, TransportOption.new(*args)) end - def callback(method, *args, &block) - msgid = send_request(method, args) - @reqtable[msgid] = CallbackRequest.new(self, @loop, block) + def default_option + @dtopt.dup end - def call(method, *args) + def send_over(topt, method, *args) + msgid = send_request(topt, method, args) + @reqtable[msgid] = Future.new(self, @loop) + end + + def callback_over(topt, method, *args, &block) + msgid = send_request(topt, method, args) + @reqtable[msgid] = Future.new(self, @loop, block) + end + + def call_over(topt, method, *args) # FIXME if @reqtable.empty? optimize - req = send(method, *args) + req = send_over(topt, method, *args) req.join if req.error raise req.error if req.error.is_a?(Error) raise RemoteError.new(req.error, req.result) end req.result end - def notify(method, *args) - send_notify(method, args) + def notify_over(topt, method, *args) + send_notify(topt, method, args) end + def send_message_over(topt, msg) # for Over#send_message + get_transport(topt).send_message(msg) + end + def close - @sockpool.reject! {|sock| - sock.detach if sock.attached? - sock.close - true - } + @dtran.close + @trans.each {|tran| tran.close } reset self end def step_timeout @@ -446,81 +473,635 @@ end } !@reqtable.empty? end + def on_message(sock, msg) # Session interface + case msg[0] + when REQUEST + on_request(sock, msg[1], msg[2], msg[3]) + when RESPONSE + on_response(sock, msg[1], msg[2], msg[3]) + when NOTIFY + on_notify(sock, msg[1], msg[2]) + when SESSION + # ignore because session is already bound + else + raise RPCError.new("unknown message type #{msg[0]}") + end + end - def on_response(msgid, error, result) + def on_connect_failed + @reqtable.reject! {|msgid, req| + begin + req.call ConnectError.new, nil + rescue + end + true + } + # FIXME reset? + end + + private + def create_transport(topt) + TCPTransport.new(self, topt) + end + + def get_transport(topt) + if topt.nil? || @dtran.match?(topt) + return @dtran + end + if tran = @trans.find {|f| f.match?(topt) } + return tran + end + tran = create_transport(topt) + @trans.push(tran) + tran + end + + class Over + def initialize(session, topt) + @session = session + @topt = topt + end + + def send(method, *args) + @session.send_over(@topt, method, *args) + end + + def callback(method, *args, &block) + @session.callback_over(@topt, method, *args, &block) + end + + def call(method, *args) + @session.call_over(@topt, method, *args) + end + + def notify(method, *args) + @session.notify_over(@topt, method, *args) + end + + def send_message(msg) # Transport interface for Responder + @session.send_message_over(@topt, msg) + end + end + + def send_notify(topt, method, param) + unless @address + raise RPCError.new("unexpected send request on server session") + end + method = method.to_s unless method.is_a?(Integer) + tran = get_transport(topt) + tran.send_message([NOTIFY, method, param]) + end + + def send_request(topt, method, param) + unless @address + raise RPCError.new("unexpected send request on server session") + end + method = method.to_s unless method.is_a?(Integer) + tran = get_transport(topt) + msgid = @seqid + @seqid += 1; if @seqid >= 1<<31 then @seqid = 0 end + @reqtable[msgid] = Future.new(self, @loop) + tran.send_message([REQUEST, msgid, method, param]) + msgid + end + + def on_response(sock, msgid, error, result) if req = @reqtable.delete(msgid) req.call(error, result) end end - def on_request(method, param, res) + def on_request(sock, msgid, method, param) + #if sock.option.rxproto + # #if @address + # tran = Over.new(self, sock.option) + #else + tran = sock + #end + res = Responder.new(tran, msgid) @dispatcher.dispatch_request(self, method, param, res) end - def on_notify(method, param) + def on_notify(sock, method, param) @dispatcher.dispatch_notify(self, method, param) end def on_close(sock) @sockpool.delete(sock) end - - private def reset - @sockpool = [] @reqtable = {} @seqid = 0 - @pending = "" - @connecting = 0 end +end - def send_request(method, param) - method = method.to_s unless method.is_a?(Integer) - msgid = @seqid - @seqid += 1; if @seqid >= 1<<31 then @seqid = 0 end - send_data RPCSocket.pack_request(msgid, method, param) - msgid + +class BasicTransport + def initialize(session, topt) + @session = session + @option = topt end - def send_notify(method, param) - method = method.to_s unless method.is_a?(Integer) - send_data RPCSocket.pack_notify(method, param) - nil + def match?(topt) + @option == topt end - def send_data(msg) - unless @target_addr - raise RPCError.new("unexpected send request on server session") + #def close; end + + protected + def get_address + #@option.address || @session.address + @session.address + end +end + +class TCPTransport < BasicTransport + def initialize(session, topt) + super(session, topt) + + @pending = "" + @sockpool = [] + @connecting = 0 + @reconnect = 5 # FIXME default reconnect limit + + @initmsg = "" + if session.self_address + @initmsg << [SESSION, session.self_address].to_msgpack end + unless topt.default? + @initmsg << topt.to_msgpack + end + @initmsg = nil if @initmsg.empty? + + if topt.txopt.deflate? + @deflate = Zlib::Deflate.new + else + @deflate = nil + end + end + + def send_message(msg) # Transport interface if @sockpool.empty? if @connecting == 0 try_connect @connecting = 1 end - @pending << msg + if @deflate + @pending << @deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH) + else + @pending << msg.to_msgpack + end else # FIXME pesudo connection load balance sock = @sockpool.first - sock.write msg + sock.send_message(msg) end end + def close # Transport interface + @sockpool.reject! {|sock| + sock.detach if sock.attached? + sock.close + true + } + @sockpool = [] + @connecting = 0 + @pending = "" + @deflate.reset if @deflate + self + end + + def on_connect(sock) + @sockpool.push(sock) + sock.send_pending(@pending, @deflate) + @pending = "" + @deflate = Zlib::Deflate.new if @deflate + @connecting = 0 + end + + def on_connect_failed(sock) + if @connecting < @reconnect + try_connect + @connecting += 1 + else + @connecting = 0 + @pending = "" + @session.on_connect_failed + end + end + + def on_close(sock) + @sockpool.delete(sock) + end + + private def try_connect - port, host = ::Socket.unpack_sockaddr_in(@target_addr.sockaddr) - sock = RPCSocket.connect(host, port, self) # async connect + addr = get_address + if addr.nil? + return # FIXME raise? + end + host, port = *addr + sock = ActiveSocket.connect(host, port, self, @option, @session) # async connect if @initmsg sock.write @initmsg end - @loop.attach(sock) + @session.loop.attach(sock) end end +class TCPTransport::Socket < Rev::TCPSocket + def initialize(sock, session) + @buffer = '' + @nread = 0 + @mpac = MessagePack::Unpacker.new + @deflate = nil + @inflate = nil + @s = session + super(sock) + end + def session + @s + end + + def on_read(data) + if @inflate + data = @inflate.inflate(data) + return if data.empty? + end + @buffer << data + + while true + @nread = @mpac.execute(@buffer, @nread) + if @mpac.finished? + msg = @mpac.data + @mpac.reset + @buffer.slice!(0, @nread) + @nread = 0 + on_message(msg) + next unless @buffer.empty? + end + break + end + end + + def on_message(msg) + return unless @s + @s.on_message(self, msg) + end + + def on_close + @deflate.close if @deflate + @inflate.close if @inflate + end + + def send_message(msg) # Transport interface + if @deflate + data = @deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH) + else + data = msg.to_msgpack + end + write data + end +end + +class TCPTransport::ActiveSocket < TCPTransport::Socket + def initialize(sock, tran, topt, session) + super(sock, session) + @tran = tran + set_option(topt) + end + + def on_connect + return unless @tran + @tran.on_connect(self) + end + + def on_connect_failed + return unless @tran + @tran.on_connect_failed(self) + end + + def on_close + return unless @tran + @tran.on_close(self) + @tran = nil + @s = nil + rescue + nil + ensure + super + end + + def send_pending(data, z) + write data + @deflate = z + end + + private + def set_option(sopt) # stream option + if sopt.txopt.deflate? + @deflate = Zlib::Deflate.new + end + if sopt.rxopt.deflate? + @inflate = Zlib::Inflate.new + #@buffer = @inflate.inflate(@buffer) unless @buffer.empty? + end + end +end + +class TCPTransport::PassiveSocket < TCPTransport::Socket + def initialize(sock, create_session) + super(sock, create_session.call) + @option = TransportOption.new # active option (reversed) + end + + attr_reader :option # for Session#on_request + + def rebind(session) + @s = session + end + + def on_message(msg) + if msg[0] == OPTION + sopt = StreamOption.from_msgpack(msg) + set_option(sopt) + return + end + super(msg) + end + + private + def set_option(sopt) + if sopt.txopt.deflate? + @inflate = Zlib::Inflate.new + @buffer = @inflate.inflate(@buffer) unless @buffer.empty? + end + if sopt.rxopt.deflate? + @deflate = Zlib::Deflate.new + end + # rx-tx reverse + @option = TransportOption.new + @option.txopt = sopt.rxopt + @option.rxopt = sopt.txopt + #@option.proto = sopt.rxproto || PROTO_TCP + #@option.address = sopt.rxaddr + end +end + +class TCPTransport::Listener + def initialize(host, port, &create_session) + @lsock = Rev::TCPServer.new(host, port, TCPTransport::PassiveSocket, create_session) + end + + def activate(loop) + loop.attach(@lsock) + end + + def close + @lsock.detach if @lsock.attached? + @lsock.close + end +end + + +=begin +class UDPTransport < BasicTransport + def initialize(session, topt) + super(session, topt) + + initmsg = "" + if session.self_address + initmsg << [SESSION, session.self_address].to_msgpack + end + unless topt.default? + initmsg << topt.to_msgpack + end + initmsg = nil if initmsg.empty? + + sock = UDPSocket.new + @addr = get_address + @asock = ActiveSocket.new(sock, initmsg, topt, session) + end + + def send_message(msg) # Transport interface + @asock.send_message_to(msg, @addr) + end + + def close + @sock.close + @deflate.close if @deflate + end +end + +class UDPTransport::Socket < Rev::IO + def initialize(sock, create_session) + @sock = sock + @mpac = MessagePack::Unpacker.new + @create_session = create_session + super(sock) + end + + def on_readable + begin + buffer, from = @sock.recvfrom(65536) + rescue Errno::EAGAIN + return + rescue Errno::EINTR + return + end + # addr = Address.from_sockaddr(from) FIXME + on_recv(buffer, addr) + end + + def on_message(rc, msg) + return unless @s + @s.on_message(rc, msg) + end + + def send_message_with(initmsg, msg, deflate, addr) + if deflate + d2 = deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH) + else + d2 = msg.to_msgpack + end + if initmsg + data = d2 + else + data = initmsg.dup + data << d2 + end + @sock.sendto(data, 0, addr.sockaddr) + end +end + +class UDPTransport::ActiveSocket < UDPTransport::Socket + def initialize(sock, initmsg, topt, session) + @sock = sock + super(sock) + if topt.txopt.deflate? + @deflate = Zlib::Deflate.new + else + @deflate = nil + end + @initmsg = initmsg + @s = session + end + + def on_recv(data, addr) + rc = ReturnContext.new(self, addr) + if @inflate + data = @inflate.inflate(data) + @inflate.finish + return if data.empty? + end + nread = 0 + while true + nread = @mpac.execute(data, nread) + if @mpac.finished? + msg = @mpac.data + @mpac.reset + data.slice!(0, nread) + nread = 0 + on_message(rc, msg) + next unless data.empty? + else + @mpac.reset + end + break + end + end + + def send_message_to(msg, addr) + send_message_with(@initmsg, msg, @deflate, addr) + end + + private + class ReturnContext + def initialize(asock, addr) + @asock = asock + @addr = addr + end + + def send_message(msg) # Transport interface + @asock.send_message_to(msg, @addr) + end + end +end + +class UDPTransport::PassiveSocket < UDPTransport::Socket + def initialize(sock, create_session) + super(sock) + @create_session = create_session + @inflate = LazyInflate.new + @deflate = LazyInflate.new + end + + def on_recv(data, addr) + rc = PassiveReturnContext.new(self, addr) + nread = 0 + while true + nread = @mpac.execute(data, nread) + if @mpac.finished? + msg = @mpac.data + @mpac.reset + data.slice!(0, nread) + nread = 0 + data = on_message(data, rc, msg) + next unless data.empty? + else + @mpac.reset + end + break + end + end + + def on_message(data, rc, msg) + if msg[0] == OPTION + sopt = StreamOption.from_msgpack(msg) + return set_option(data, rc, sopt) + end + super(rc, msg) + end + + attr_reader :deflate # for ReturnContext#send_message + + private + class ReturnContext + def initialize(psock, addr) + @psock = psock + @addr = addr + @option = TransportOption.new + end + attr_accessor :option + + def send_message(msg) # Transport interface + if @option.txopt.deflate? + deflate = @psock.deflate.get + else + deflate = nil + end + @psock.send_message_with(nil, msg, deflate, @addr) + end + end + + class LazyDeflate + def initialize + @deflate = nil + end + def get + @deflate ||= Zlib::Deflate.new + end + end + + class LazyInflate + def initialize + @inflate = nil + end + def get + @inflate ||= Zlib::Inflate.new + end + end + + def set_option(data, rc, sopt) + if sopt.txopt.deflate? + inflate = @inflate.get + data = inflate.inflate(data) + @inflate.finish + end + # rx-tx reverse + rc.option.reset + rc.option.txopt = sopt.rxopt + rc.option.proto = sopt.rxproto || PROTO_UDP + rc.option.address = sopt.rxaddr + data + end +end + +class UDPTransport::Listener + def initialize(host, port, &create_session) + @sock = UDPSocket.new + @sock.bind(host, port) + @psock = PassiveSocket.new(@sock, &create_session) + end + + def activate(loop) + loop.attach(@psock) + end + + def close + @psock.detach if @psock.attached? + @psock.close + end +end +=end + + class AsyncResult def initialize @responder = nil @sent = false end @@ -660,27 +1241,27 @@ end end class Client < Session - def initialize(host, port, loop = Loop.new) + def initialize(host, port, loop = Loop.new, dtopt = TransportOption.new) @loop = loop @host = host @port = port - target_addr = Address.new(host, port) - super(nil, target_addr, NullDispatcher.new, loop) + addr = Address.new(host, port) + super(addr, dtopt, NullDispatcher.new, nil, loop) @timer = Timer.new(1, true) { step_timeout } loop.attach(@timer) end attr_reader :host, :port - def self.open(host, port, loop = Loop.new, &block) - cli = new(host, port, loop) + def self.open(host, port, loop = Loop.new, dtopt = TransportOption.new, &block) + cli = new(host, port, loop, dtopt) begin block.call(cli) ensure cli.close end @@ -694,111 +1275,136 @@ include LoopUtil end class SessionPool - def initialize(loop = Loop.new) + def initialize(loop = Loop.new, dtopt = TransportOption.new) @loop = loop @spool = {} @stimer = Timer.new(1, true, &method(:step_timeout)) + @dtopt = dtopt loop.attach(@stimer) end def get_session(host, port) - target_addr = Address.new(host, port) - @spool[target_addr] ||= create_session(target_addr) + addr = Address.new(host, port) + get_session_addr(addr) end + def get_session_addr(addr) + @spool[addr] ||= create_session(addr) + end + def close - @spool.reject! {|target_addr, s| + @spool.reject! {|addr, s| s.close true } @stimer.detach if @stimer.attached? nil end include LoopUtil protected - def create_session(target_addr) - Session.new(nil, target_addr, nil, @loop) + def create_session(addr) + Session.new(addr, @dtopt, nil, nil, @loop) end private def step_timeout - @spool.each_pair {|target_addr, s| + @spool.each_pair {|addr, s| s.step_timeout } end end class Server < SessionPool - class Socket < RPCSocket - def initialize(sock, dispatcher, loop) - s = Session.new(nil, nil, dispatcher, loop) - super(sock, s) - end + def initialize(loop = Loop.new) + super(loop, TransportOption.new) + @dispatcher = nil + @listeners = [] end - def initialize(loop = Loop.new) - super(loop) - @lsocks = [] + def serve(obj, accept = obj.public_methods) + @dispatcher = ObjectDispatcher.new(obj, accept) end - def listen(host, port, obj, accept = obj.public_methods) - dispatcher = ObjectDispatcher.new(obj, accept) - lsock = Rev::TCPServer.new(host, port, Server::Socket, dispatcher, @loop) - @lsocks.push(lsock) - @loop.attach(lsock) + def listen(host, port, obj = nil, accept = obj.public_methods) + unless obj.nil? + serve(obj, accept) + end + listen_real(host, port) + self end + def listen_real(host, port) #, proto = PROTO_TCP) + creator = Proc.new { create_session(nil) } + #case proto + #when PROTO_TCP, :tcp + # listener = TCPTransport::Listener.new(host, port, &creator) + #when PROTO_UDP, :udp + # listener = UDPTransport::Listener.new(host, port, &creator) + #else + # raise "unknown option: #{proto.inspect}" + #end + listener = TCPTransport::Listener.new(host, port, &creator) + @listeners.push(listener) + listener.activate(@loop) + self + end + def close - @lsocks.reject! {|lsock| - lsock.detach if lsock.attached? - lsock.close + @listeners.reject! {|listener| + listener.close true } super end + + protected + def create_session(addr) + Session.new(addr, @dtopt, @dispatcher, nil, @loop) + end end class Cluster < SessionPool - class Socket < RPCSocket - def initialize(sock, binder) - @binder = binder - super(sock, nil) - end - - def on_message(msg) - if bound? - super - else - @binder.call(self, msg) - end - end - end - - def initialize(host, port, loop = Loop.new) - super(loop) + def initialize(host, port, loop = Loop.new, dtopt = TransportOption.new) + super(loop, dtopt) @host = host @port = port @dispatcher = nil - self_addr = Address.new(host, port) - @initmsg = RPCSocket.pack_init(self_addr) + @self_addr = Address.new(host, port) + @listeners = [] + listen(host, port) # FIXME obsolete? end attr_reader :host, :port def serve(obj, accept = obj.public_methods) @dispatcher = ObjectDispatcher.new(obj, accept) - @lsock = Rev::TCPServer.new(host, port, Cluster::Socket, method(:lazy_bind)) - @loop.attach(@lsock) self end + def listen(host, port) #, proto = PROTO_TCP) + lz = LazyBinder.new(self) + creator = Proc.new { lz } + #case proto + #when PROTO_TCP, :tcp + # listener = TCPTransport::Listener.new(host, port, &creator) + #when PROTO_UDP, :udp + # listener = UDPTransport::Listener.new(host, port, &creator) + #else + # raise "unknown option: #{proto.inspect}" + #end + listener = TCPTransport::Listener.new(host, port, &creator) + @listeners.push(listener) + listener.activate(@loop) + self + end + def close if @lsock @lsock.detach if @lsock.attached? @lsock.close end @@ -806,30 +1412,41 @@ end include LoopUtil protected - def create_session(target_addr) - Session.new(@initmsg, target_addr, @dispatcher, @loop) + def create_session(addr) + Session.new(addr, @dtopt, @dispatcher, @self_addr, @loop) end + public + def create_server_session + # FIXME + Session.new(nil, @dtopt, @dispatcher, @self_addr, @loop) + end + private - def lazy_bind(sock, msg) - if msg[0] == INIT - # cluster - target_addr = Address.load(msg[1]) - s = Session.new(@initmsg, target_addr, @dispatcher, @loop) - sock.bind_session(s) - @spool[target_addr] = s - else - # subsys - s = Session.new(nil, nil, @dispatcher, @loop) - sock.bind_session(s) - sock.on_message(msg) + class LazyBinder < Session + def initialize(cluster) + @cluster = cluster end + + def on_message(sock, msg) # Session interface + if msg[0] == SESSION + # cluster + addr = Address.load(msg[1]) + session = @cluster.get_session_addr(addr) + sock.rebind(session) + else + # subsys + session = @cluster.create_server_session + sock.rebind(session) + sock.on_message(msg) + end + end end end -end # module RPC -end # module MessagePack +end +end