lib/msgpack/rpc.rb in msgpack-rpc-0.2.0 vs lib/msgpack/rpc.rb in msgpack-rpc-0.3.0
- old
+ new
@@ -1,9 +1,9 @@
#
# MessagePack-RPC for Ruby
#
-# Copyright (C) 2009 FURUHASHI Sadayuki
+# Copyright (C) 2010 FURUHASHI Sadayuki
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -15,10 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'msgpack'
require 'rev'
+require 'zlib'
module MessagePack
module RPC
@@ -49,26 +50,21 @@
def initialize(msg = "connect failed")
super
end
end
-class Responder
- def initialize(socket, msgid)
- @socket = socket
- @msgid = msgid
- end
- def result(retval, err = nil)
- @socket.write RPCSocket.pack_response(@msgid, retval, err)
- end
+REQUEST = 0 # [0, msgid, method, param]
+RESPONSE = 1 # [1, msgid, error, result]
+NOTIFY = 2 # [2, method, param]
+SESSION = 3 # [3, addr]
+OPTION = 4 # [4, txopt, rxopt] #[, rxproto, rxaddr]
+OPT_DEFLATE = 0b00000001
+#PROTO_TCP = 0
+#PROTO_UDP = 1
- def error(err)
- result(nil, err)
- end
-end
-
class Address
# +--+----+
# | 2| 4 |
# +--+----+
# port network byte order
@@ -82,30 +78,34 @@
#
test = Socket.pack_sockaddr_in(0,'0.0.0.0')
if test[0] == "\0"[0] || test[1] == "\0"[0]
# Linux
- IMPL_LINUX = true
+ def initialize(host, port)
+ raw = Socket.pack_sockaddr_in(port, host)
+ family = raw.unpack('S')[0]
+ if family == Socket::AF_INET
+ @serial = raw[2,6]
+ elsif family == Socket::AF_INET6
+ @serial = raw[2,2] + raw[8,16]
+ else
+ raise "Unknown address family: #{family}"
+ end
+ end
else
# BSD
- IMPL_LINUX = false
- end
-
- def initialize(host, port)
- raw = Socket.pack_sockaddr_in(port, host)
- if IMPL_LINUX
- family = raw.unpack('S')[0]
- else
+ def initialize(host, port)
+ raw = Socket.pack_sockaddr_in(port, host)
family = raw.unpack('CC')[1]
+ if family == Socket::AF_INET
+ @serial = raw[2,6]
+ elsif family == Socket::AF_INET6
+ @serial = raw[2,2] + raw[8,16]
+ else
+ raise "Unknown address family: #{family}"
+ end
end
- if family == Socket::AF_INET
- @serial = raw[2,6]
- elsif family == Socket::AF_INET6
- @serial = raw[2,2] + raw[8,16]
- else
- raise "Unknown address family: #{family}"
- end
end
def host
unpack[0]
end
@@ -164,11 +164,10 @@
def inspect
"#<#{self.class} #{to_s} @serial=#{@serial.inspect}>"
end
-
def eql?(o)
o.class == Address && dump.eql?(o.dump)
end
def hash
@@ -179,257 +178,285 @@
eql?(o)
end
end
-REQUEST = 0
-RESPONSE = 1
-NOTIFY = 2
-INIT = 3
-
-
-class RPCSocket < Rev::TCPSocket
- def initialize(sock, session)
- @buffer = ''
- @nread = 0
- @mpac = MessagePack::Unpacker.new
- @s = session
- super(sock)
+class Future
+ def initialize(s, loop, block = nil)
+ @s = s
+ @timeout = s.timeout
+ @loop = loop
+ @block = block
+ @error = nil
+ @result = nil
end
+ attr_reader :loop
+ attr_accessor :result, :error
- def bind_session(s)
- @s = s
+ def attach_callback(proc = nil, &block)
+ @block = proc || block
end
- def bound?
- !@s.nil?
+ def call(err, res)
+ @error = err
+ @result = res
+ if @block
+ @block.call(err, res)
+ end
+ @s = nil
end
- def on_read(data)
- @buffer << data
-
- while true
- @nread = @mpac.execute(@buffer, @nread)
-
- if @mpac.finished?
- msg = @mpac.data
-
- @mpac.reset
- @buffer.slice!(0, @nread)
- @nread = 0
-
- on_message(msg) # RPCSocket#on_message
-
- next unless @buffer.empty?
- end
-
- break
+ def join
+ while @s
+ @loop.run_once
end
+ self
end
- def on_message(msg)
- case msg[0]
- when REQUEST
- on_request(msg[1], msg[2], msg[3])
- when RESPONSE
- on_response(msg[1], msg[2], msg[3])
- when NOTIFY
- on_notify(msg[1], msg[2])
- when INIT
- on_init(msg[1])
+ def step_timeout
+ if @timeout < 1
+ true
else
- raise RPCError.new("unknown message type #{msg[0]}")
+ @timeout -= 1
+ false
end
end
+end
- def on_connect
- return unless @s
- @s.on_connect(self)
- end
- def on_connect_failed
- return unless @s
- @s.on_connect_failed(self)
+class Responder
+ def initialize(tran, msgid)
+ @tran = tran # send_message method is required
+ @msgid = msgid
end
- def on_close
- return unless @s
- @s.on_close(self)
- @s = nil
- rescue
- nil
+ def result(retval, err = nil)
+ @tran.send_message [RESPONSE, @msgid, err, retval]
end
- def on_request(msgid, method, param)
- return unless @s
- @s.on_request(method, param, Responder.new(self,msgid))
+ def error(err, retval = nil)
+ result(retval, err)
end
+end
- def on_notify(method, param)
- return unless @s
- @s.on_notify(method, param)
+
+class ExchangeOption
+ def initialize(flags = 0)
+ @flags = flags
end
- def on_response(msgid, error, result)
- return unless @s
- @s.on_response(msgid, error, result)
+ def get
+ @flags
end
- def on_init(msg)
- # ignore
+ def deflate=(val)
+ val ? (@flags |= OPT_DEFLATE) : (@flags &= ~OPT_DEFLATE)
end
- def send_message(msg)
- write msg.to_msgpack
+ def deflate?
+ @flags & OPT_DEFLATE != 0
end
- def self.pack_request(msgid, method, param)
- [REQUEST, msgid, method, param].to_msgpack
+ def reset
+ @flags = 0
+ self
end
- def self.pack_response(msgid, result, error)
- [RESPONSE, msgid, error, result].to_msgpack
+ def ==(o)
+ @flags == o.get
end
- def self.pack_notify(method, param)
- [NOTIFY, method, param].to_msgpack
+ def to_msgpack(out = '')
+ @flags.to_msgpack(out)
end
- def self.pack_init(msg)
- [INIT, msg].to_msgpack
+ def self.from_msgpack(obj)
+ @flags = obj
end
end
+class StreamOption
+ def initialize(txopt = ExchangeOption.new, rxopt = ExchangeOption.new) #, rxproto = nil, rxaddr = nil)
+ @txopt = txopt
+ @rxopt = rxopt
+ #@rxproto = rxproto # nil or PROTO_TCP or PROTO_UDP
+ #@rxaddr = rxaddr # nil or address
+ end
+ attr_accessor :txopt, :rxopt #, :rxproto, :rxaddr
-class Session
+ def default?
+ @txopt.get == 0 && @rxopt.get == 0 && @rxproto.nil?
+ end
- class BasicRequest
- def initialize(s, loop)
- @s = s
- @timeout = s.timeout
- @loop = loop
- end
- attr_reader :loop
+ def reset
+ @txopt.reset
+ @rxopt.reset
+ #@rxproto = nil
+ #@rxaddr = nil
+ self
+ end
- def call(err, res)
- @s = nil
- end
+ def ==(o)
+ @txopt == o.txopt && @rxopt == o.rxopt # &&
+ #@rxproto == o.rxproto && @rxaddr == o.rxaddr
+ end
- def join
- while @s
- @loop.run_once
- end
- self
- end
+ def to_msgpack(out = '')
+ array = [OPTION, @txopt, @rxopt]
+ #if @rxproto
+ # array << @rxproto
+ # if @rxaddr
+ # array << @rxaddr
+ # end
+ #end
+ array.to_msgpack(out)
+ end
- def step_timeout
- if @timeout < 1
- true
+ def self.from_msgpack(obj)
+ txopt = ExchangeOption.new(obj[1] || 0)
+ rxopt = ExchangeOption.new(obj[2] || 0)
+ #if obj[3]
+ # rxproto = obj[3]
+ # if obj[4]
+ # rxaddr = Address.load(obj[4])
+ # end
+ #end
+ StreamOption.new(txopt, rxopt) #, rxproto, rxaddr)
+ end
+end
+
+class TransportOption < StreamOption
+ def initialize(*args)
+ super()
+ #@proto = PROTO_TCP
+ #@address = nil
+ args.each do |x|
+ case x
+ #when :tx_tcp, :tcp
+ # @proto = PROTO_TCP
+ #when :tx_udp, :udp
+ # @proto = PROTO_UDP
+ #when :rx_tcp
+ # @rxproto = PROTO_TCP
+ #when :rx_udp
+ # @rxproto = PROTO_UDP
+ when :deflate
+ @txopt.deflate = true
+ @rxopt.deflate = true
+ when :tx_deflate
+ @txopt.deflate = true
+ when :rx_deflate
+ @rxopt.deflate = true
+ #when PROTO_TCP
+ # @proto = PROTO_TCP
+ #when PROTO_UDP
+ # @proto = PROTO_UDP
+ #when Address
+ # @rxaddr = x
else
- @timeout -= 1
- false
+ raise "unknown option #{x.inspect}"
end
end
end
+ #attr_accessor :proto, :address
- class AsyncRequest < BasicRequest
- def initialize(s, loop)
- super(s, loop)
- @error = nil
- @result = nil
- end
- attr_accessor :result, :error
+ def reset
+ #@proto = PROTO_TCP
+ @address = nil
+ super
+ end
- def call(err, res)
- @error = err
- @result = res
- @s = nil
- end
+ def ==(o)
+ super(o) # && @proto = o.proto && @address = o.address
end
- class CallbackRequest < BasicRequest
- def initialize(s, loop, block)
- super(s, loop)
- @block = block
- end
+ #def deflate(val = true) txopt.deflate = va; rxopt.deflate = va; self end
+ #def rx_deflate(val = true) @rxopt.deflate = val; self; end
+ #def tx_deflate(val = true) @txopt.deflate = val; self; end
- def call(err, res)
- @block.call(err, res)
- end
- end
+ #def tcp; @proto = PROTO_TCP; self; end
+ #def tx_tcp; @proto = PROTO_TCP; self; end
+ #def rx_tcp; @rxproto = PROTO_TCP; self; end
+ #def udp; @proto = PROTO_UDP; self; end
+ #def rx_udp; @proto = PROTO_UDP; self; end
+ #def rx_udp; @rxproto = PROTO_UDP; self; end
+end
- def initialize(initmsg, target_addr, dispatcher, loop)
- @initmsg = initmsg
- @target_addr = target_addr
+
+class Session
+ def initialize(to_addr, dtopt, dispatcher, self_addr, loop)
+ @address = to_addr # destination address
+ @self_address = self_addr # self session identifier
@dispatcher = dispatcher || NullDispatcher.new
+ @dtopt = dtopt # default transport option
+ @dtran = create_transport(@dtopt) # default transport
@loop = loop
+ @trans = []
+ @reqtable = {}
@timeout = 10 # FIXME default timeout time
- @reconnect = 5 # FIXME default reconnect limit
reset
end
- attr_accessor :timeout, :reconnect
+ attr_reader :address, :self_address, :loop
+ attr_accessor :timeout
- def address
- @target_addr
+ def send(method, *args)
+ send_over(nil, method, *args)
end
- def on_connect(sock)
- @sockpool.push(sock)
- sock.write @pending
- @pending = ""
- @connecting = 0
+ def callback(method, *args, &block)
+ callback_over(nil, method, *args, &block)
end
- def on_connect_failed(sock)
- if @connecting < @reconnect
- try_connect
- @connecting += 1
- else
- @connecting = 0
- @reqtable.reject! {|msgid, req|
- begin
- req.call ConnectError.new, nil
- rescue
- end
- true
- }
- end
+ def call(method, *args)
+ call_over(nil, method, *args)
end
+ def notify(method, *args)
+ notify_over(nil, method, *args)
+ end
- def send(method, *args)
- msgid = send_request(method, args)
- @reqtable[msgid] = AsyncRequest.new(self, @loop)
+ def over(*args)
+ Over.new(self, TransportOption.new(*args))
end
- def callback(method, *args, &block)
- msgid = send_request(method, args)
- @reqtable[msgid] = CallbackRequest.new(self, @loop, block)
+ def default_option
+ @dtopt.dup
end
- def call(method, *args)
+ def send_over(topt, method, *args)
+ msgid = send_request(topt, method, args)
+ @reqtable[msgid] = Future.new(self, @loop)
+ end
+
+ def callback_over(topt, method, *args, &block)
+ msgid = send_request(topt, method, args)
+ @reqtable[msgid] = Future.new(self, @loop, block)
+ end
+
+ def call_over(topt, method, *args)
# FIXME if @reqtable.empty? optimize
- req = send(method, *args)
+ req = send_over(topt, method, *args)
req.join
if req.error
raise req.error if req.error.is_a?(Error)
raise RemoteError.new(req.error, req.result)
end
req.result
end
- def notify(method, *args)
- send_notify(method, args)
+ def notify_over(topt, method, *args)
+ send_notify(topt, method, args)
end
+ def send_message_over(topt, msg) # for Over#send_message
+ get_transport(topt).send_message(msg)
+ end
+
def close
- @sockpool.reject! {|sock|
- sock.detach if sock.attached?
- sock.close
- true
- }
+ @dtran.close
+ @trans.each {|tran| tran.close }
reset
self
end
def step_timeout
@@ -446,81 +473,635 @@
end
}
!@reqtable.empty?
end
+ def on_message(sock, msg) # Session interface
+ case msg[0]
+ when REQUEST
+ on_request(sock, msg[1], msg[2], msg[3])
+ when RESPONSE
+ on_response(sock, msg[1], msg[2], msg[3])
+ when NOTIFY
+ on_notify(sock, msg[1], msg[2])
+ when SESSION
+ # ignore because session is already bound
+ else
+ raise RPCError.new("unknown message type #{msg[0]}")
+ end
+ end
- def on_response(msgid, error, result)
+ def on_connect_failed
+ @reqtable.reject! {|msgid, req|
+ begin
+ req.call ConnectError.new, nil
+ rescue
+ end
+ true
+ }
+ # FIXME reset?
+ end
+
+ private
+ def create_transport(topt)
+ TCPTransport.new(self, topt)
+ end
+
+ def get_transport(topt)
+ if topt.nil? || @dtran.match?(topt)
+ return @dtran
+ end
+ if tran = @trans.find {|f| f.match?(topt) }
+ return tran
+ end
+ tran = create_transport(topt)
+ @trans.push(tran)
+ tran
+ end
+
+ class Over
+ def initialize(session, topt)
+ @session = session
+ @topt = topt
+ end
+
+ def send(method, *args)
+ @session.send_over(@topt, method, *args)
+ end
+
+ def callback(method, *args, &block)
+ @session.callback_over(@topt, method, *args, &block)
+ end
+
+ def call(method, *args)
+ @session.call_over(@topt, method, *args)
+ end
+
+ def notify(method, *args)
+ @session.notify_over(@topt, method, *args)
+ end
+
+ def send_message(msg) # Transport interface for Responder
+ @session.send_message_over(@topt, msg)
+ end
+ end
+
+ def send_notify(topt, method, param)
+ unless @address
+ raise RPCError.new("unexpected send request on server session")
+ end
+ method = method.to_s unless method.is_a?(Integer)
+ tran = get_transport(topt)
+ tran.send_message([NOTIFY, method, param])
+ end
+
+ def send_request(topt, method, param)
+ unless @address
+ raise RPCError.new("unexpected send request on server session")
+ end
+ method = method.to_s unless method.is_a?(Integer)
+ tran = get_transport(topt)
+ msgid = @seqid
+ @seqid += 1; if @seqid >= 1<<31 then @seqid = 0 end
+ @reqtable[msgid] = Future.new(self, @loop)
+ tran.send_message([REQUEST, msgid, method, param])
+ msgid
+ end
+
+ def on_response(sock, msgid, error, result)
if req = @reqtable.delete(msgid)
req.call(error, result)
end
end
- def on_request(method, param, res)
+ def on_request(sock, msgid, method, param)
+ #if sock.option.rxproto
+ # #if @address
+ # tran = Over.new(self, sock.option)
+ #else
+ tran = sock
+ #end
+ res = Responder.new(tran, msgid)
@dispatcher.dispatch_request(self, method, param, res)
end
- def on_notify(method, param)
+ def on_notify(sock, method, param)
@dispatcher.dispatch_notify(self, method, param)
end
def on_close(sock)
@sockpool.delete(sock)
end
-
- private
def reset
- @sockpool = []
@reqtable = {}
@seqid = 0
- @pending = ""
- @connecting = 0
end
+end
- def send_request(method, param)
- method = method.to_s unless method.is_a?(Integer)
- msgid = @seqid
- @seqid += 1; if @seqid >= 1<<31 then @seqid = 0 end
- send_data RPCSocket.pack_request(msgid, method, param)
- msgid
+
+class BasicTransport
+ def initialize(session, topt)
+ @session = session
+ @option = topt
end
- def send_notify(method, param)
- method = method.to_s unless method.is_a?(Integer)
- send_data RPCSocket.pack_notify(method, param)
- nil
+ def match?(topt)
+ @option == topt
end
- def send_data(msg)
- unless @target_addr
- raise RPCError.new("unexpected send request on server session")
+ #def close; end
+
+ protected
+ def get_address
+ #@option.address || @session.address
+ @session.address
+ end
+end
+
+class TCPTransport < BasicTransport
+ def initialize(session, topt)
+ super(session, topt)
+
+ @pending = ""
+ @sockpool = []
+ @connecting = 0
+ @reconnect = 5 # FIXME default reconnect limit
+
+ @initmsg = ""
+ if session.self_address
+ @initmsg << [SESSION, session.self_address].to_msgpack
end
+ unless topt.default?
+ @initmsg << topt.to_msgpack
+ end
+ @initmsg = nil if @initmsg.empty?
+
+ if topt.txopt.deflate?
+ @deflate = Zlib::Deflate.new
+ else
+ @deflate = nil
+ end
+ end
+
+ def send_message(msg) # Transport interface
if @sockpool.empty?
if @connecting == 0
try_connect
@connecting = 1
end
- @pending << msg
+ if @deflate
+ @pending << @deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH)
+ else
+ @pending << msg.to_msgpack
+ end
else
# FIXME pesudo connection load balance
sock = @sockpool.first
- sock.write msg
+ sock.send_message(msg)
end
end
+ def close # Transport interface
+ @sockpool.reject! {|sock|
+ sock.detach if sock.attached?
+ sock.close
+ true
+ }
+ @sockpool = []
+ @connecting = 0
+ @pending = ""
+ @deflate.reset if @deflate
+ self
+ end
+
+ def on_connect(sock)
+ @sockpool.push(sock)
+ sock.send_pending(@pending, @deflate)
+ @pending = ""
+ @deflate = Zlib::Deflate.new if @deflate
+ @connecting = 0
+ end
+
+ def on_connect_failed(sock)
+ if @connecting < @reconnect
+ try_connect
+ @connecting += 1
+ else
+ @connecting = 0
+ @pending = ""
+ @session.on_connect_failed
+ end
+ end
+
+ def on_close(sock)
+ @sockpool.delete(sock)
+ end
+
+ private
def try_connect
- port, host = ::Socket.unpack_sockaddr_in(@target_addr.sockaddr)
- sock = RPCSocket.connect(host, port, self) # async connect
+ addr = get_address
+ if addr.nil?
+ return # FIXME raise?
+ end
+ host, port = *addr
+ sock = ActiveSocket.connect(host, port, self, @option, @session) # async connect
if @initmsg
sock.write @initmsg
end
- @loop.attach(sock)
+ @session.loop.attach(sock)
end
end
+class TCPTransport::Socket < Rev::TCPSocket
+ def initialize(sock, session)
+ @buffer = ''
+ @nread = 0
+ @mpac = MessagePack::Unpacker.new
+ @deflate = nil
+ @inflate = nil
+ @s = session
+ super(sock)
+ end
+ def session
+ @s
+ end
+
+ def on_read(data)
+ if @inflate
+ data = @inflate.inflate(data)
+ return if data.empty?
+ end
+ @buffer << data
+
+ while true
+ @nread = @mpac.execute(@buffer, @nread)
+ if @mpac.finished?
+ msg = @mpac.data
+ @mpac.reset
+ @buffer.slice!(0, @nread)
+ @nread = 0
+ on_message(msg)
+ next unless @buffer.empty?
+ end
+ break
+ end
+ end
+
+ def on_message(msg)
+ return unless @s
+ @s.on_message(self, msg)
+ end
+
+ def on_close
+ @deflate.close if @deflate
+ @inflate.close if @inflate
+ end
+
+ def send_message(msg) # Transport interface
+ if @deflate
+ data = @deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH)
+ else
+ data = msg.to_msgpack
+ end
+ write data
+ end
+end
+
+class TCPTransport::ActiveSocket < TCPTransport::Socket
+ def initialize(sock, tran, topt, session)
+ super(sock, session)
+ @tran = tran
+ set_option(topt)
+ end
+
+ def on_connect
+ return unless @tran
+ @tran.on_connect(self)
+ end
+
+ def on_connect_failed
+ return unless @tran
+ @tran.on_connect_failed(self)
+ end
+
+ def on_close
+ return unless @tran
+ @tran.on_close(self)
+ @tran = nil
+ @s = nil
+ rescue
+ nil
+ ensure
+ super
+ end
+
+ def send_pending(data, z)
+ write data
+ @deflate = z
+ end
+
+ private
+ def set_option(sopt) # stream option
+ if sopt.txopt.deflate?
+ @deflate = Zlib::Deflate.new
+ end
+ if sopt.rxopt.deflate?
+ @inflate = Zlib::Inflate.new
+ #@buffer = @inflate.inflate(@buffer) unless @buffer.empty?
+ end
+ end
+end
+
+class TCPTransport::PassiveSocket < TCPTransport::Socket
+ def initialize(sock, create_session)
+ super(sock, create_session.call)
+ @option = TransportOption.new # active option (reversed)
+ end
+
+ attr_reader :option # for Session#on_request
+
+ def rebind(session)
+ @s = session
+ end
+
+ def on_message(msg)
+ if msg[0] == OPTION
+ sopt = StreamOption.from_msgpack(msg)
+ set_option(sopt)
+ return
+ end
+ super(msg)
+ end
+
+ private
+ def set_option(sopt)
+ if sopt.txopt.deflate?
+ @inflate = Zlib::Inflate.new
+ @buffer = @inflate.inflate(@buffer) unless @buffer.empty?
+ end
+ if sopt.rxopt.deflate?
+ @deflate = Zlib::Deflate.new
+ end
+ # rx-tx reverse
+ @option = TransportOption.new
+ @option.txopt = sopt.rxopt
+ @option.rxopt = sopt.txopt
+ #@option.proto = sopt.rxproto || PROTO_TCP
+ #@option.address = sopt.rxaddr
+ end
+end
+
+class TCPTransport::Listener
+ def initialize(host, port, &create_session)
+ @lsock = Rev::TCPServer.new(host, port, TCPTransport::PassiveSocket, create_session)
+ end
+
+ def activate(loop)
+ loop.attach(@lsock)
+ end
+
+ def close
+ @lsock.detach if @lsock.attached?
+ @lsock.close
+ end
+end
+
+
+=begin
+class UDPTransport < BasicTransport
+ def initialize(session, topt)
+ super(session, topt)
+
+ initmsg = ""
+ if session.self_address
+ initmsg << [SESSION, session.self_address].to_msgpack
+ end
+ unless topt.default?
+ initmsg << topt.to_msgpack
+ end
+ initmsg = nil if initmsg.empty?
+
+ sock = UDPSocket.new
+ @addr = get_address
+ @asock = ActiveSocket.new(sock, initmsg, topt, session)
+ end
+
+ def send_message(msg) # Transport interface
+ @asock.send_message_to(msg, @addr)
+ end
+
+ def close
+ @sock.close
+ @deflate.close if @deflate
+ end
+end
+
+class UDPTransport::Socket < Rev::IO
+ def initialize(sock, create_session)
+ @sock = sock
+ @mpac = MessagePack::Unpacker.new
+ @create_session = create_session
+ super(sock)
+ end
+
+ def on_readable
+ begin
+ buffer, from = @sock.recvfrom(65536)
+ rescue Errno::EAGAIN
+ return
+ rescue Errno::EINTR
+ return
+ end
+ # addr = Address.from_sockaddr(from) FIXME
+ on_recv(buffer, addr)
+ end
+
+ def on_message(rc, msg)
+ return unless @s
+ @s.on_message(rc, msg)
+ end
+
+ def send_message_with(initmsg, msg, deflate, addr)
+ if deflate
+ d2 = deflate.deflate(msg.to_msgpack, Zlib::SYNC_FLUSH)
+ else
+ d2 = msg.to_msgpack
+ end
+ if initmsg
+ data = d2
+ else
+ data = initmsg.dup
+ data << d2
+ end
+ @sock.sendto(data, 0, addr.sockaddr)
+ end
+end
+
+class UDPTransport::ActiveSocket < UDPTransport::Socket
+ def initialize(sock, initmsg, topt, session)
+ @sock = sock
+ super(sock)
+ if topt.txopt.deflate?
+ @deflate = Zlib::Deflate.new
+ else
+ @deflate = nil
+ end
+ @initmsg = initmsg
+ @s = session
+ end
+
+ def on_recv(data, addr)
+ rc = ReturnContext.new(self, addr)
+ if @inflate
+ data = @inflate.inflate(data)
+ @inflate.finish
+ return if data.empty?
+ end
+ nread = 0
+ while true
+ nread = @mpac.execute(data, nread)
+ if @mpac.finished?
+ msg = @mpac.data
+ @mpac.reset
+ data.slice!(0, nread)
+ nread = 0
+ on_message(rc, msg)
+ next unless data.empty?
+ else
+ @mpac.reset
+ end
+ break
+ end
+ end
+
+ def send_message_to(msg, addr)
+ send_message_with(@initmsg, msg, @deflate, addr)
+ end
+
+ private
+ class ReturnContext
+ def initialize(asock, addr)
+ @asock = asock
+ @addr = addr
+ end
+
+ def send_message(msg) # Transport interface
+ @asock.send_message_to(msg, @addr)
+ end
+ end
+end
+
+class UDPTransport::PassiveSocket < UDPTransport::Socket
+ def initialize(sock, create_session)
+ super(sock)
+ @create_session = create_session
+ @inflate = LazyInflate.new
+ @deflate = LazyInflate.new
+ end
+
+ def on_recv(data, addr)
+ rc = PassiveReturnContext.new(self, addr)
+ nread = 0
+ while true
+ nread = @mpac.execute(data, nread)
+ if @mpac.finished?
+ msg = @mpac.data
+ @mpac.reset
+ data.slice!(0, nread)
+ nread = 0
+ data = on_message(data, rc, msg)
+ next unless data.empty?
+ else
+ @mpac.reset
+ end
+ break
+ end
+ end
+
+ def on_message(data, rc, msg)
+ if msg[0] == OPTION
+ sopt = StreamOption.from_msgpack(msg)
+ return set_option(data, rc, sopt)
+ end
+ super(rc, msg)
+ end
+
+ attr_reader :deflate # for ReturnContext#send_message
+
+ private
+ class ReturnContext
+ def initialize(psock, addr)
+ @psock = psock
+ @addr = addr
+ @option = TransportOption.new
+ end
+ attr_accessor :option
+
+ def send_message(msg) # Transport interface
+ if @option.txopt.deflate?
+ deflate = @psock.deflate.get
+ else
+ deflate = nil
+ end
+ @psock.send_message_with(nil, msg, deflate, @addr)
+ end
+ end
+
+ class LazyDeflate
+ def initialize
+ @deflate = nil
+ end
+ def get
+ @deflate ||= Zlib::Deflate.new
+ end
+ end
+
+ class LazyInflate
+ def initialize
+ @inflate = nil
+ end
+ def get
+ @inflate ||= Zlib::Inflate.new
+ end
+ end
+
+ def set_option(data, rc, sopt)
+ if sopt.txopt.deflate?
+ inflate = @inflate.get
+ data = inflate.inflate(data)
+ @inflate.finish
+ end
+ # rx-tx reverse
+ rc.option.reset
+ rc.option.txopt = sopt.rxopt
+ rc.option.proto = sopt.rxproto || PROTO_UDP
+ rc.option.address = sopt.rxaddr
+ data
+ end
+end
+
+class UDPTransport::Listener
+ def initialize(host, port, &create_session)
+ @sock = UDPSocket.new
+ @sock.bind(host, port)
+ @psock = PassiveSocket.new(@sock, &create_session)
+ end
+
+ def activate(loop)
+ loop.attach(@psock)
+ end
+
+ def close
+ @psock.detach if @psock.attached?
+ @psock.close
+ end
+end
+=end
+
+
class AsyncResult
def initialize
@responder = nil
@sent = false
end
@@ -660,27 +1241,27 @@
end
end
class Client < Session
- def initialize(host, port, loop = Loop.new)
+ def initialize(host, port, loop = Loop.new, dtopt = TransportOption.new)
@loop = loop
@host = host
@port = port
- target_addr = Address.new(host, port)
- super(nil, target_addr, NullDispatcher.new, loop)
+ addr = Address.new(host, port)
+ super(addr, dtopt, NullDispatcher.new, nil, loop)
@timer = Timer.new(1, true) {
step_timeout
}
loop.attach(@timer)
end
attr_reader :host, :port
- def self.open(host, port, loop = Loop.new, &block)
- cli = new(host, port, loop)
+ def self.open(host, port, loop = Loop.new, dtopt = TransportOption.new, &block)
+ cli = new(host, port, loop, dtopt)
begin
block.call(cli)
ensure
cli.close
end
@@ -694,111 +1275,136 @@
include LoopUtil
end
class SessionPool
- def initialize(loop = Loop.new)
+ def initialize(loop = Loop.new, dtopt = TransportOption.new)
@loop = loop
@spool = {}
@stimer = Timer.new(1, true, &method(:step_timeout))
+ @dtopt = dtopt
loop.attach(@stimer)
end
def get_session(host, port)
- target_addr = Address.new(host, port)
- @spool[target_addr] ||= create_session(target_addr)
+ addr = Address.new(host, port)
+ get_session_addr(addr)
end
+ def get_session_addr(addr)
+ @spool[addr] ||= create_session(addr)
+ end
+
def close
- @spool.reject! {|target_addr, s|
+ @spool.reject! {|addr, s|
s.close
true
}
@stimer.detach if @stimer.attached?
nil
end
include LoopUtil
protected
- def create_session(target_addr)
- Session.new(nil, target_addr, nil, @loop)
+ def create_session(addr)
+ Session.new(addr, @dtopt, nil, nil, @loop)
end
private
def step_timeout
- @spool.each_pair {|target_addr, s|
+ @spool.each_pair {|addr, s|
s.step_timeout
}
end
end
class Server < SessionPool
- class Socket < RPCSocket
- def initialize(sock, dispatcher, loop)
- s = Session.new(nil, nil, dispatcher, loop)
- super(sock, s)
- end
+ def initialize(loop = Loop.new)
+ super(loop, TransportOption.new)
+ @dispatcher = nil
+ @listeners = []
end
- def initialize(loop = Loop.new)
- super(loop)
- @lsocks = []
+ def serve(obj, accept = obj.public_methods)
+ @dispatcher = ObjectDispatcher.new(obj, accept)
end
- def listen(host, port, obj, accept = obj.public_methods)
- dispatcher = ObjectDispatcher.new(obj, accept)
- lsock = Rev::TCPServer.new(host, port, Server::Socket, dispatcher, @loop)
- @lsocks.push(lsock)
- @loop.attach(lsock)
+ def listen(host, port, obj = nil, accept = obj.public_methods)
+ unless obj.nil?
+ serve(obj, accept)
+ end
+ listen_real(host, port)
+ self
end
+ def listen_real(host, port) #, proto = PROTO_TCP)
+ creator = Proc.new { create_session(nil) }
+ #case proto
+ #when PROTO_TCP, :tcp
+ # listener = TCPTransport::Listener.new(host, port, &creator)
+ #when PROTO_UDP, :udp
+ # listener = UDPTransport::Listener.new(host, port, &creator)
+ #else
+ # raise "unknown option: #{proto.inspect}"
+ #end
+ listener = TCPTransport::Listener.new(host, port, &creator)
+ @listeners.push(listener)
+ listener.activate(@loop)
+ self
+ end
+
def close
- @lsocks.reject! {|lsock|
- lsock.detach if lsock.attached?
- lsock.close
+ @listeners.reject! {|listener|
+ listener.close
true
}
super
end
+
+ protected
+ def create_session(addr)
+ Session.new(addr, @dtopt, @dispatcher, nil, @loop)
+ end
end
class Cluster < SessionPool
- class Socket < RPCSocket
- def initialize(sock, binder)
- @binder = binder
- super(sock, nil)
- end
-
- def on_message(msg)
- if bound?
- super
- else
- @binder.call(self, msg)
- end
- end
- end
-
- def initialize(host, port, loop = Loop.new)
- super(loop)
+ def initialize(host, port, loop = Loop.new, dtopt = TransportOption.new)
+ super(loop, dtopt)
@host = host
@port = port
@dispatcher = nil
- self_addr = Address.new(host, port)
- @initmsg = RPCSocket.pack_init(self_addr)
+ @self_addr = Address.new(host, port)
+ @listeners = []
+ listen(host, port) # FIXME obsolete?
end
attr_reader :host, :port
def serve(obj, accept = obj.public_methods)
@dispatcher = ObjectDispatcher.new(obj, accept)
- @lsock = Rev::TCPServer.new(host, port, Cluster::Socket, method(:lazy_bind))
- @loop.attach(@lsock)
self
end
+ def listen(host, port) #, proto = PROTO_TCP)
+ lz = LazyBinder.new(self)
+ creator = Proc.new { lz }
+ #case proto
+ #when PROTO_TCP, :tcp
+ # listener = TCPTransport::Listener.new(host, port, &creator)
+ #when PROTO_UDP, :udp
+ # listener = UDPTransport::Listener.new(host, port, &creator)
+ #else
+ # raise "unknown option: #{proto.inspect}"
+ #end
+ listener = TCPTransport::Listener.new(host, port, &creator)
+ @listeners.push(listener)
+ listener.activate(@loop)
+ self
+ end
+
def close
if @lsock
@lsock.detach if @lsock.attached?
@lsock.close
end
@@ -806,30 +1412,41 @@
end
include LoopUtil
protected
- def create_session(target_addr)
- Session.new(@initmsg, target_addr, @dispatcher, @loop)
+ def create_session(addr)
+ Session.new(addr, @dtopt, @dispatcher, @self_addr, @loop)
end
+ public
+ def create_server_session
+ # FIXME
+ Session.new(nil, @dtopt, @dispatcher, @self_addr, @loop)
+ end
+
private
- def lazy_bind(sock, msg)
- if msg[0] == INIT
- # cluster
- target_addr = Address.load(msg[1])
- s = Session.new(@initmsg, target_addr, @dispatcher, @loop)
- sock.bind_session(s)
- @spool[target_addr] = s
- else
- # subsys
- s = Session.new(nil, nil, @dispatcher, @loop)
- sock.bind_session(s)
- sock.on_message(msg)
+ class LazyBinder < Session
+ def initialize(cluster)
+ @cluster = cluster
end
+
+ def on_message(sock, msg) # Session interface
+ if msg[0] == SESSION
+ # cluster
+ addr = Address.load(msg[1])
+ session = @cluster.get_session_addr(addr)
+ sock.rebind(session)
+ else
+ # subsys
+ session = @cluster.create_server_session
+ sock.rebind(session)
+ sock.on_message(msg)
+ end
+ end
end
end
-end # module RPC
-end # module MessagePack
+end
+end