lib/msgpack/rpc.rb in msgpack-rpc-0.3.0 vs lib/msgpack/rpc.rb in msgpack-rpc-0.4.0

- old
+ new

@@ -1,7 +1,7 @@ # -# MessagePack-RPC for Ruby +# MessagePack-RPC for Ruby TCP transport # # Copyright (C) 2010 FURUHASHI Sadayuki # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,1440 +13,154 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # -require 'msgpack' -require 'rev' -require 'zlib' +module MessagePack #:nodoc: -module MessagePack +# MessagePack-RPC is an inter-process messaging library that uses +# MessagePack for object serialization. The goal of the project is +# providing fast and scalable messaging system for server, client +# and cluster applications. +# +# == Client API +# +# MessagePack::RPC::Client and MessagePack::RPC::SessionPool are for RPC clients. +# +# === Simple usage +# Client is subclass of Session. Use Session#call method to call remote methods. +# +# require 'msgpack/rpc' # gem install msgpack-rpc +# +# client = MessagePack::RPC::Client.new('127.0.0.1', 18800) +# +# result = client.call(:methodName, arg1, arg2, arg3) +# +# === Asynchronous call +# Use Session#call_async method to call remote methods asynchronously. It returns a Future. Use Future#get or Future#attach_callback to get actual result. +# +# require 'msgpack/rpc' # gem install msgpack-rpc +# +# client = MessagePack::RPC::Client.new('127.0.0.1', 18800) +# +# future1 = client.call(:method1, arg1) # call two methods concurrently +# future2 = client.call(:method2, arg1) +# +# result1 = future1.get +# result2 = future2.get +# +# === Connection pooling +# +# SessionPool#get_session returns a Session. It pools created session and enables you to reuse established connections. +# +# +# == Server API +# +# MessagePack::RPC::Server is for RPC servers. +# +# === Simple usage +# +# The public methods of handler class becomes callbale. +# +# require 'msgpack/rpc' # gem install msgpack-rpc +# +# class MyHandler +# def methodName(arg1, arg2, arg3) +# puts "received" +# return "return result." +# end +# end +# +# server = MessagePack::RPC::Server.new +# server.listen('0.0.0.0', 18800, MyHandler.new) +# server.run +# +# === Advance return +# +# In the handler method, you can use *yield* to send the result without returning. +# +# class MyHandler +# def method1(arg1) +# yield("return result.") +# puts "you can do something after returning the result" +# end +# end +# +# === Delayed return +# +# You can use AsyncResult to return results later. +# +# class MyHandler +# def method2(arg1) +# as = MessagePack::RPC::AsyncResult.new +# Thread.new do +# sleep 10 # return result 10 seconds later. +# as.result("return result.") +# end +# return as +# end +# end +# +# +# You can receive and send any objects that can be serialized by MessagePack. +# This means that the objects required to implement *to_msgpack(out = '')* method. +# +# +# == Transports +# +# You can use UDP and UNIX domain sockets instead of TCP. +# +# === For clients +# +# For clients, use MessagePack::RPC::UDPTransport or MessagePack::RPC::UNIXTransport. +# +# require 'msgpack/rpc' # gem install msgpack-rpc +# require 'msgpack/rpc/transport/udp' +# +# transport = MessagePack::RPC::UDPTransport.new +# address = MessagePack::RPC::Address.new('127.0.0.1', 18800) +# client = MessagePack::RPC::Client.new(transport, address) +# +# result = client.call(:methodName, arg1, arg2, arg3) +# +# === For servers +# +# For servers, use MessagePack::RPC::UDPServerTransport or MessagePack::RPC::UNIXServerTransport. +# +# require 'msgpack/rpc' # gem install msgpack-rpc +# require 'msgpack/rpc/transport/udp' +# +# class MyHandler +# def methodName(arg1, arg2, arg3) +# puts "received" +# return "return result." +# end +# end +# +# address = MessagePack::RPC::Address.new('0.0.0.0', 18800) +# listener = MessagePack::RPC::UDPServerTransport.new(address) +# server = MessagePack::RPC::Server.new +# server.listen(listener, MyHandler.new) +# server.run +# +# module RPC - - -class Error < StandardError end -class RPCError < Error - def initialize(msg) - super(msg) - end -end +end # module MessagePack -class RemoteError < RPCError - def initialize(msg, result = nil) - super(msg) - @result = result - end - attr_reader :result -end -class TimeoutError < Error - def initialize(msg = "request timed out") - super - end -end - -class ConnectError < TimeoutError - def initialize(msg = "connect failed") - super - end -end - - -REQUEST = 0 # [0, msgid, method, param] -RESPONSE = 1 # [1, msgid, error, result] -NOTIFY = 2 # [2, method, param] -SESSION = 3 # [3, addr] -OPTION = 4 # [4, txopt, rxopt] #[, rxproto, rxaddr] -OPT_DEFLATE = 0b00000001 -#PROTO_TCP = 0 -#PROTO_UDP = 1 - - -class Address - # +--+----+ - # | 2| 4 | - # +--+----+ - # port network byte order - # IPv4 address - # - # +--+----------------+ - # | 2| 16 | - # +--+----------------+ - # port network byte order - # IPv6 address - # - - test = Socket.pack_sockaddr_in(0,'0.0.0.0') - if test[0] == "\0"[0] || test[1] == "\0"[0] - # Linux - def initialize(host, port) - raw = Socket.pack_sockaddr_in(port, host) - family = raw.unpack('S')[0] - if family == Socket::AF_INET - @serial = raw[2,6] - elsif family == Socket::AF_INET6 - @serial = raw[2,2] + raw[8,16] - else - raise "Unknown address family: #{family}" - end - end - else - # BSD - def initialize(host, port) - raw = Socket.pack_sockaddr_in(port, host) - family = raw.unpack('CC')[1] - if family == Socket::AF_INET - @serial = raw[2,6] - elsif family == Socket::AF_INET6 - @serial = raw[2,2] + raw[8,16] - else - raise "Unknown address family: #{family}" - end - end - end - - def host - unpack[0] - end - - def port - unpack[1] - end - - def connectable? - port != 0 - end - - def sockaddr - Address.parse_sockaddr(@serial) - end - - def unpack - Address.parse(@serial) - end - - def self.parse_sockaddr(raw) - if raw.length == 6 - addr = Socket.pack_sockaddr_in(0, '0.0.0.0') - addr[2,6] = raw[0,6] - else - addr = Socket.pack_sockaddr_in(0, '::') - addr[2,2] = raw[0,2] - addr[8,16] = raw[2,16] - end - addr - end - - def self.parse(raw) - Socket.unpack_sockaddr_in(parse_sockaddr(raw)).reverse - end - - def self.load(raw) - Address.new *parse(raw) - end - - def dump - @serial - end - - def to_msgpack(out = '') - @serial.to_msgpack(out) - end - - def to_s - unpack.join(':') - end - - def to_a - unpack - end - - def inspect - "#<#{self.class} #{to_s} @serial=#{@serial.inspect}>" - end - - def eql?(o) - o.class == Address && dump.eql?(o.dump) - end - - def hash - dump.hash - end - - def ==(o) - eql?(o) - end -end - - -class Future - def initialize(s, loop, block = nil) - @s = s - @timeout = s.timeout - @loop = loop - @block = block - @error = nil - @result = nil - end - attr_reader :loop - attr_accessor :result, :error - - def attach_callback(proc = nil, &block) - @block = proc || block - end - - def call(err, res) - @error = err - @result = res - if @block - @block.call(err, res) - end - @s = nil - end - - def join - while @s - @loop.run_once - end - self - end - - def step_timeout - if @timeout < 1 - true - else - @timeout -= 1 - false - end - end -end - - -class Responder - def initialize(tran, msgid) - @tran = tran # send_message method is required - @msgid = msgid - end - - def result(retval, err = nil) - @tran.send_message [RESPONSE, @msgid, err, retval] - end - - def error(err, retval = nil) - result(retval, err) - end -end - - -class ExchangeOption - def initialize(flags = 0) - @flags = flags - end - - def get - @flags - end - - def deflate=(val) - val ? (@flags |= OPT_DEFLATE) : (@flags &= ~OPT_DEFLATE) - end - - def deflate? - @flags & OPT_DEFLATE != 0 - end - - def reset - @flags = 0 - self - end - - def ==(o) - @flags == o.get - end - - def to_msgpack(out = '') - @flags.to_msgpack(out) - end - - def self.from_msgpack(obj) - @flags = obj - end -end - -class StreamOption - def initialize(txopt = ExchangeOption.new, rxopt = ExchangeOption.new) #, rxproto = nil, rxaddr = nil) - @txopt = txopt - @rxopt = rxopt - #@rxproto = rxproto # nil or PROTO_TCP or PROTO_UDP - #@rxaddr = rxaddr # nil or address - end - attr_accessor :txopt, :rxopt #, :rxproto, :rxaddr - - def default? - @txopt.get == 0 && @rxopt.get == 0 && @rxproto.nil? - end - - def reset - @txopt.reset - @rxopt.reset - #@rxproto = nil - #@rxaddr = nil - self - end - - def ==(o) - @txopt == o.txopt && @rxopt == o.rxopt # && - #@rxproto == o.rxproto && @rxaddr == o.rxaddr - end - - def to_msgpack(out = '') - array = [OPTION, @txopt, @rxopt] - #if @rxproto - # array << @rxproto - # if @rxaddr - # array << @rxaddr - # end - #end - array.to_msgpack(out) - end - - def self.from_msgpack(obj) - txopt = ExchangeOption.new(obj[1] || 0) - rxopt = ExchangeOption.new(obj[2] || 0) - #if obj[3] - # rxproto = obj[3] - # if obj[4] - # rxaddr = Address.load(obj[4]) - # end - #end - StreamOption.new(txopt, rxopt) #, rxproto, rxaddr) - end -end - -class TransportOption < StreamOption - def initialize(*args) - super() - #@proto = PROTO_TCP - #@address = nil - args.each do |x| - case x - #when :tx_tcp, :tcp - # @proto = PROTO_TCP - #when :tx_udp, :udp - # @proto = PROTO_UDP - #when :rx_tcp - # @rxproto = PROTO_TCP - #when :rx_udp - # @rxproto = PROTO_UDP - when :deflate - @txopt.deflate = true - @rxopt.deflate = true - when :tx_deflate - @txopt.deflate = true - when :rx_deflate - @rxopt.deflate = true - #when PROTO_TCP - # @proto = PROTO_TCP - #when PROTO_UDP - # @proto = PROTO_UDP - #when Address - # @rxaddr = x - else - raise "unknown option #{x.inspect}" - end - end - end - #attr_accessor :proto, :address - - def reset - #@proto = PROTO_TCP - @address = nil - super - end - - def ==(o) - super(o) # && @proto = o.proto && @address = o.address - end - - #def deflate(val = true) txopt.deflate = va; rxopt.deflate = va; self end - #def rx_deflate(val = true) @rxopt.deflate = val; self; end - #def tx_deflate(val = true) @txopt.deflate = val; self; end - - #def tcp; @proto = PROTO_TCP; self; end - #def tx_tcp; @proto = PROTO_TCP; self; end - #def rx_tcp; @rxproto = PROTO_TCP; self; end - - #def udp; @proto = PROTO_UDP; self; end - #def rx_udp; @proto = PROTO_UDP; self; end - #def rx_udp; @rxproto = PROTO_UDP; self; end -end - - -class Session - def initialize(to_addr, dtopt, dispatcher, self_addr, loop) - @address = to_addr # destination address - @self_address = self_addr # self session identifier - @dispatcher = dispatcher || NullDispatcher.new - @dtopt = dtopt # default transport option - @dtran = create_transport(@dtopt) # default transport - @loop = loop - @trans = [] - @reqtable = {} - @timeout = 10 # FIXME default timeout time - reset - end - attr_reader :address, :self_address, :loop - attr_accessor :timeout - - def send(method, *args) - send_over(nil, method, *args) - end - - def callback(method, *args, &block) - callback_over(nil, method, *args, &block) - end - - def call(method, *args) - call_over(nil, method, *args) - end - - def notify(method, *args) - notify_over(nil, method, *args) - end - - def over(*args) - Over.new(self, TransportOption.new(*args)) - end - - def default_option - @dtopt.dup - end - - def send_over(topt, method, *args) - msgid = send_request(topt, method, args) - @reqtable[msgid] = Future.new(self, @loop) - end - - def callback_over(topt, method, *args, &block) - msgid = send_request(topt, method, args) - @reqtable[msgid] = Future.new(self, @loop, block) - end - - def call_over(topt, method, *args) - # FIXME if @reqtable.empty? optimize - req = send_over(topt, method, *args) - req.join - if req.error - raise req.error if req.error.is_a?(Error) - raise RemoteError.new(req.error, req.result) - end - req.result - end - - def notify_over(topt, method, *args) - send_notify(topt, method, args) - end - - def send_message_over(topt, msg) # for Over#send_message - get_transport(topt).send_message(msg) - end - - def close - @dtran.close - @trans.each {|tran| tran.close } - reset - self - end - - def step_timeout - reqs = [] - @reqtable.reject! {|msgid, req| - if req.step_timeout - reqs.push(req) - end - } - reqs.each {|req| - begin - req.call TimeoutError.new, nil - rescue - end - } - !@reqtable.empty? - end - - def on_message(sock, msg) # Session interface - case msg[0] - when REQUEST - on_request(sock, msg[1], msg[2], msg[3]) - when RESPONSE - on_response(sock, msg[1], msg[2], msg[3]) - when NOTIFY - on_notify(sock, msg[1], msg[2]) - when SESSION - # ignore because session is already bound - else - raise RPCError.new("unknown message type #{msg[0]}") - end - end - - def on_connect_failed - @reqtable.reject! {|msgid, req| - begin - req.call ConnectError.new, nil - rescue - end - true - } - # FIXME reset? - end - - private - def create_transport(topt) - TCPTransport.new(self, topt) - end - - def get_transport(topt) - if topt.nil? || @dtran.match?(topt) - return @dtran - end - if tran = @trans.find {|f| f.match?(topt) } - return tran - end - tran = create_transport(topt) - @trans.push(tran) - tran - end - - class Over - def initialize(session, topt) - @session = session - @topt = topt - end - - def send(method, *args) - @session.send_over(@topt, method, *args) - end - - def callback(method, *args, &block) - @session.callback_over(@topt, method, *args, &block) - end - - def call(method, *args) - @session.call_over(@topt, method, *args) - end - - def notify(method, *args) - @session.notify_over(@topt, method, *args) - end - - def send_message(msg) # Transport interface for Responder - @session.send_message_over(@topt, msg) - end - end - - def send_notify(topt, method, param) - unless @address - raise RPCError.new("unexpected send request on server session") - end - method = method.to_s unless method.is_a?(Integer) - tran = get_transport(topt) - tran.send_message([NOTIFY, method, param]) - end - - def send_request(topt, method, param) - unless @address - raise RPCError.new("unexpected send request on server session") - end - method = method.to_s unless method.is_a?(Integer) - tran = get_transport(topt) - msgid = @seqid - @seqid += 1; if @seqid >= 1<<31 then @seqid = 0 end - @reqtable[msgid] = Future.new(self, @loop) - tran.send_message([REQUEST, msgid, method, param]) - msgid - end - - def on_response(sock, msgid, error, result) - if req = @reqtable.delete(msgid) - req.call(error, result) - end - end - - def on_request(sock, msgid, method, param) - #if sock.option.rxproto - # #if @address - # tran = Over.new(self, sock.option) - #else - tran = sock - #end - res = Responder.new(tran, msgid) - @dispatcher.dispatch_request(self, method, param, res) - end - - def on_notify(sock, method, param) - @dispatcher.dispatch_notify(self, method, param) - end - - def on_close(sock) - @sockpool.delete(sock) - end - - def reset - @reqtable = {} - @seqid = 0 - end -end - - -class BasicTransport - def initialize(session, topt) - @session = session - @option = topt - end - - def match?(topt) - @option == topt - end - - #def close; end - - protected - def get_address - #@option.address || @session.address - @session.address - end -end - -class TCPTransport < BasicTransport - def initialize(session, topt) - super(session, topt) - - @pending = "" - @sockpool = [] - @connecting = 0 - @reconnect = 5 # FIXME default reconnect limit - - @initmsg = "" - if session.self_address - @initmsg << [SESSION, session.self_address].to_msgpack - end - unless topt.default? - @initmsg << topt.to_msgpack - end - @initmsg = nil if @initmsg.empty? - - if topt.txopt.deflate? - @deflate = Zlib::Deflate.new - else - @deflate = nil - end - end - - def send_message(msg) # Transport interface - if @sockpool.empty? - if @connecting == 0 - try_connect - @connecting = 1 - end - if @deflate - @pending << @deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH) - else - @pending << msg.to_msgpack - end - else - # FIXME pesudo connection load balance - sock = @sockpool.first - sock.send_message(msg) - end - end - - def close # Transport interface - @sockpool.reject! {|sock| - sock.detach if sock.attached? - sock.close - true - } - @sockpool = [] - @connecting = 0 - @pending = "" - @deflate.reset if @deflate - self - end - - def on_connect(sock) - @sockpool.push(sock) - sock.send_pending(@pending, @deflate) - @pending = "" - @deflate = Zlib::Deflate.new if @deflate - @connecting = 0 - end - - def on_connect_failed(sock) - if @connecting < @reconnect - try_connect - @connecting += 1 - else - @connecting = 0 - @pending = "" - @session.on_connect_failed - end - end - - def on_close(sock) - @sockpool.delete(sock) - end - - private - def try_connect - addr = get_address - if addr.nil? - return # FIXME raise? - end - host, port = *addr - sock = ActiveSocket.connect(host, port, self, @option, @session) # async connect - if @initmsg - sock.write @initmsg - end - @session.loop.attach(sock) - end -end - -class TCPTransport::Socket < Rev::TCPSocket - def initialize(sock, session) - @buffer = '' - @nread = 0 - @mpac = MessagePack::Unpacker.new - @deflate = nil - @inflate = nil - @s = session - super(sock) - end - - def session - @s - end - - def on_read(data) - if @inflate - data = @inflate.inflate(data) - return if data.empty? - end - @buffer << data - - while true - @nread = @mpac.execute(@buffer, @nread) - if @mpac.finished? - msg = @mpac.data - @mpac.reset - @buffer.slice!(0, @nread) - @nread = 0 - on_message(msg) - next unless @buffer.empty? - end - break - end - end - - def on_message(msg) - return unless @s - @s.on_message(self, msg) - end - - def on_close - @deflate.close if @deflate - @inflate.close if @inflate - end - - def send_message(msg) # Transport interface - if @deflate - data = @deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH) - else - data = msg.to_msgpack - end - write data - end -end - -class TCPTransport::ActiveSocket < TCPTransport::Socket - def initialize(sock, tran, topt, session) - super(sock, session) - @tran = tran - set_option(topt) - end - - def on_connect - return unless @tran - @tran.on_connect(self) - end - - def on_connect_failed - return unless @tran - @tran.on_connect_failed(self) - end - - def on_close - return unless @tran - @tran.on_close(self) - @tran = nil - @s = nil - rescue - nil - ensure - super - end - - def send_pending(data, z) - write data - @deflate = z - end - - private - def set_option(sopt) # stream option - if sopt.txopt.deflate? - @deflate = Zlib::Deflate.new - end - if sopt.rxopt.deflate? - @inflate = Zlib::Inflate.new - #@buffer = @inflate.inflate(@buffer) unless @buffer.empty? - end - end -end - -class TCPTransport::PassiveSocket < TCPTransport::Socket - def initialize(sock, create_session) - super(sock, create_session.call) - @option = TransportOption.new # active option (reversed) - end - - attr_reader :option # for Session#on_request - - def rebind(session) - @s = session - end - - def on_message(msg) - if msg[0] == OPTION - sopt = StreamOption.from_msgpack(msg) - set_option(sopt) - return - end - super(msg) - end - - private - def set_option(sopt) - if sopt.txopt.deflate? - @inflate = Zlib::Inflate.new - @buffer = @inflate.inflate(@buffer) unless @buffer.empty? - end - if sopt.rxopt.deflate? - @deflate = Zlib::Deflate.new - end - # rx-tx reverse - @option = TransportOption.new - @option.txopt = sopt.rxopt - @option.rxopt = sopt.txopt - #@option.proto = sopt.rxproto || PROTO_TCP - #@option.address = sopt.rxaddr - end -end - -class TCPTransport::Listener - def initialize(host, port, &create_session) - @lsock = Rev::TCPServer.new(host, port, TCPTransport::PassiveSocket, create_session) - end - - def activate(loop) - loop.attach(@lsock) - end - - def close - @lsock.detach if @lsock.attached? - @lsock.close - end -end - - -=begin -class UDPTransport < BasicTransport - def initialize(session, topt) - super(session, topt) - - initmsg = "" - if session.self_address - initmsg << [SESSION, session.self_address].to_msgpack - end - unless topt.default? - initmsg << topt.to_msgpack - end - initmsg = nil if initmsg.empty? - - sock = UDPSocket.new - @addr = get_address - @asock = ActiveSocket.new(sock, initmsg, topt, session) - end - - def send_message(msg) # Transport interface - @asock.send_message_to(msg, @addr) - end - - def close - @sock.close - @deflate.close if @deflate - end -end - -class UDPTransport::Socket < Rev::IO - def initialize(sock, create_session) - @sock = sock - @mpac = MessagePack::Unpacker.new - @create_session = create_session - super(sock) - end - - def on_readable - begin - buffer, from = @sock.recvfrom(65536) - rescue Errno::EAGAIN - return - rescue Errno::EINTR - return - end - # addr = Address.from_sockaddr(from) FIXME - on_recv(buffer, addr) - end - - def on_message(rc, msg) - return unless @s - @s.on_message(rc, msg) - end - - def send_message_with(initmsg, msg, deflate, addr) - if deflate - d2 = deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH) - else - d2 = msg.to_msgpack - end - if initmsg - data = d2 - else - data = initmsg.dup - data << d2 - end - @sock.sendto(data, 0, addr.sockaddr) - end -end - -class UDPTransport::ActiveSocket < UDPTransport::Socket - def initialize(sock, initmsg, topt, session) - @sock = sock - super(sock) - if topt.txopt.deflate? - @deflate = Zlib::Deflate.new - else - @deflate = nil - end - @initmsg = initmsg - @s = session - end - - def on_recv(data, addr) - rc = ReturnContext.new(self, addr) - if @inflate - data = @inflate.inflate(data) - @inflate.finish - return if data.empty? - end - nread = 0 - while true - nread = @mpac.execute(data, nread) - if @mpac.finished? - msg = @mpac.data - @mpac.reset - data.slice!(0, nread) - nread = 0 - on_message(rc, msg) - next unless data.empty? - else - @mpac.reset - end - break - end - end - - def send_message_to(msg, addr) - send_message_with(@initmsg, msg, @deflate, addr) - end - - private - class ReturnContext - def initialize(asock, addr) - @asock = asock - @addr = addr - end - - def send_message(msg) # Transport interface - @asock.send_message_to(msg, @addr) - end - end -end - -class UDPTransport::PassiveSocket < UDPTransport::Socket - def initialize(sock, create_session) - super(sock) - @create_session = create_session - @inflate = LazyInflate.new - @deflate = LazyInflate.new - end - - def on_recv(data, addr) - rc = PassiveReturnContext.new(self, addr) - nread = 0 - while true - nread = @mpac.execute(data, nread) - if @mpac.finished? - msg = @mpac.data - @mpac.reset - data.slice!(0, nread) - nread = 0 - data = on_message(data, rc, msg) - next unless data.empty? - else - @mpac.reset - end - break - end - end - - def on_message(data, rc, msg) - if msg[0] == OPTION - sopt = StreamOption.from_msgpack(msg) - return set_option(data, rc, sopt) - end - super(rc, msg) - end - - attr_reader :deflate # for ReturnContext#send_message - - private - class ReturnContext - def initialize(psock, addr) - @psock = psock - @addr = addr - @option = TransportOption.new - end - attr_accessor :option - - def send_message(msg) # Transport interface - if @option.txopt.deflate? - deflate = @psock.deflate.get - else - deflate = nil - end - @psock.send_message_with(nil, msg, deflate, @addr) - end - end - - class LazyDeflate - def initialize - @deflate = nil - end - def get - @deflate ||= Zlib::Deflate.new - end - end - - class LazyInflate - def initialize - @inflate = nil - end - def get - @inflate ||= Zlib::Inflate.new - end - end - - def set_option(data, rc, sopt) - if sopt.txopt.deflate? - inflate = @inflate.get - data = inflate.inflate(data) - @inflate.finish - end - # rx-tx reverse - rc.option.reset - rc.option.txopt = sopt.rxopt - rc.option.proto = sopt.rxproto || PROTO_UDP - rc.option.address = sopt.rxaddr - data - end -end - -class UDPTransport::Listener - def initialize(host, port, &create_session) - @sock = UDPSocket.new - @sock.bind(host, port) - @psock = PassiveSocket.new(@sock, &create_session) - end - - def activate(loop) - loop.attach(@psock) - end - - def close - @psock.detach if @psock.attached? - @psock.close - end -end -=end - - -class AsyncResult - def initialize - @responder = nil - @sent = false - end - - def result(retval, err = nil) - unless @sent - if @responder - @responder.result(retval, err) - else - @result = [retval, err] - end - @sent = true - end - nil - end - - def error(err) - result(nil, err) - nil - end - - def responder=(res) - @responder = res - if @sent && @result - @responder.result(*@result) - @result = nil - end - end -end - - -class NullDispatcher - def dispatch_request(session, method, param, res) - raise RPCError.new("unexpected request message") - end - - def dispatch_notify(session, method, param) - raise RPCError.new("unexpected notify message") - end -end - -class ObjectDispatcher - def initialize(obj, accept = obj.public_methods) - @obj = obj - @accept = accept.map {|m| m.is_a?(Integer) ? m : m.to_s} - end - - def dispatch_request(session, method, param, res) - begin - result = forward_method(session, method, param) - rescue - res.error($!.to_s) - return - end - if result.is_a?(AsyncResult) - result.responder = res - else - res.result(result) - end - end - - def dispatch_notify(session, method, param) - forward_method(session, method, param) - rescue - end - - private - def forward_method(session, method, param) - unless @accept.include?(method) - raise NoMethodError, "method `#{method}' is not accepted" - end - @obj.send(method, *param) { session } - end -end - - -Loop = Rev::Loop - - -module LoopUtil - attr_reader :loop - - class Timer < Rev::TimerWatcher - def initialize(interval, repeating, &block) - @block = block - super(interval, repeating) - end - def on_timer - @block.call - end - end - - def start_timer(interval, repeating, &block) - @loop.attach Timer.new(interval, repeating, &block) - end - - class TaskQueue < Rev::AsyncWatcher - def initialize - @queue = [] - super - end - - def push(task) - @queue.push(task) - signal - end - - def on_signal - while task = @queue.shift - begin - task.call - rescue - end - end - end - end - - def submit(task = nil, &block) - task ||= block - unless @queue - @queue = TaskQueue.new - @loop.attach(@queue) - end - @queue.push(task) - end - - def run - @loop.run - end - - def stop - @queue.detach if @queue && @queue.attached? - @loop.stop - # attach dummy timer - @loop.attach Rev::TimerWatcher.new(0, false) - nil - end -end - - -class Client < Session - def initialize(host, port, loop = Loop.new, dtopt = TransportOption.new) - @loop = loop - @host = host - @port = port - - addr = Address.new(host, port) - super(addr, dtopt, NullDispatcher.new, nil, loop) - - @timer = Timer.new(1, true) { - step_timeout - } - loop.attach(@timer) - end - attr_reader :host, :port - - def self.open(host, port, loop = Loop.new, dtopt = TransportOption.new, &block) - cli = new(host, port, loop, dtopt) - begin - block.call(cli) - ensure - cli.close - end - end - - def close - @timer.detach if @timer.attached? - super - end - - include LoopUtil -end - - -class SessionPool - def initialize(loop = Loop.new, dtopt = TransportOption.new) - @loop = loop - @spool = {} - @stimer = Timer.new(1, true, &method(:step_timeout)) - @dtopt = dtopt - loop.attach(@stimer) - end - - def get_session(host, port) - addr = Address.new(host, port) - get_session_addr(addr) - end - - def get_session_addr(addr) - @spool[addr] ||= create_session(addr) - end - - def close - @spool.reject! {|addr, s| - s.close - true - } - @stimer.detach if @stimer.attached? - nil - end - - include LoopUtil - - protected - def create_session(addr) - Session.new(addr, @dtopt, nil, nil, @loop) - end - - private - def step_timeout - @spool.each_pair {|addr, s| - s.step_timeout - } - end -end - - -class Server < SessionPool - def initialize(loop = Loop.new) - super(loop, TransportOption.new) - @dispatcher = nil - @listeners = [] - end - - def serve(obj, accept = obj.public_methods) - @dispatcher = ObjectDispatcher.new(obj, accept) - end - - def listen(host, port, obj = nil, accept = obj.public_methods) - unless obj.nil? - serve(obj, accept) - end - listen_real(host, port) - self - end - - def listen_real(host, port) #, proto = PROTO_TCP) - creator = Proc.new { create_session(nil) } - #case proto - #when PROTO_TCP, :tcp - # listener = TCPTransport::Listener.new(host, port, &creator) - #when PROTO_UDP, :udp - # listener = UDPTransport::Listener.new(host, port, &creator) - #else - # raise "unknown option: #{proto.inspect}" - #end - listener = TCPTransport::Listener.new(host, port, &creator) - @listeners.push(listener) - listener.activate(@loop) - self - end - - def close - @listeners.reject! {|listener| - listener.close - true - } - super - end - - protected - def create_session(addr) - Session.new(addr, @dtopt, @dispatcher, nil, @loop) - end -end - - -class Cluster < SessionPool - def initialize(host, port, loop = Loop.new, dtopt = TransportOption.new) - super(loop, dtopt) - @host = host - @port = port - @dispatcher = nil - @self_addr = Address.new(host, port) - @listeners = [] - listen(host, port) # FIXME obsolete? - end - attr_reader :host, :port - - def serve(obj, accept = obj.public_methods) - @dispatcher = ObjectDispatcher.new(obj, accept) - self - end - - def listen(host, port) #, proto = PROTO_TCP) - lz = LazyBinder.new(self) - creator = Proc.new { lz } - #case proto - #when PROTO_TCP, :tcp - # listener = TCPTransport::Listener.new(host, port, &creator) - #when PROTO_UDP, :udp - # listener = UDPTransport::Listener.new(host, port, &creator) - #else - # raise "unknown option: #{proto.inspect}" - #end - listener = TCPTransport::Listener.new(host, port, &creator) - @listeners.push(listener) - listener.activate(@loop) - self - end - - def close - if @lsock - @lsock.detach if @lsock.attached? - @lsock.close - end - super - end - - include LoopUtil - - protected - def create_session(addr) - Session.new(addr, @dtopt, @dispatcher, @self_addr, @loop) - end - - public - def create_server_session - # FIXME - Session.new(nil, @dtopt, @dispatcher, @self_addr, @loop) - end - - private - class LazyBinder < Session - def initialize(cluster) - @cluster = cluster - end - - def on_message(sock, msg) # Session interface - if msg[0] == SESSION - # cluster - addr = Address.load(msg[1]) - session = @cluster.get_session_addr(addr) - sock.rebind(session) - else - # subsys - session = @cluster.create_server_session - sock.rebind(session) - sock.on_message(msg) - end - end - end -end - - -end -end - +require 'msgpack' +require 'socket' +require 'rev' +require 'msgpack/rpc/address' +require 'msgpack/rpc/message' +require 'msgpack/rpc/exception' +require 'msgpack/rpc/loop' +require 'msgpack/rpc/future' +require 'msgpack/rpc/session' +require 'msgpack/rpc/session_pool' +require 'msgpack/rpc/dispatcher' +require 'msgpack/rpc/client' +require 'msgpack/rpc/server' +require 'msgpack/rpc/transport/base' +require 'msgpack/rpc/transport/tcp'