require 'gserver' require 'secure_marshal.rb' require 'whiteboard.rb' module Secure class IServere pp "SERVING EXCEPTION",e end end end class Connection attr_reader :valid def initialize(conn,server) @conn=conn @data="" @server=server @conn.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) @valid=true @mutex=Mutex.new end def send(*what) return unless @valid begin data=Secure::Marshal.dump(what) puts "Sending data #{data.length} from #{@conn.addr} to #{@conn.peeraddr}" @mutex.synchronize { if @valid len=data.length result=@conn.write(data) assert {len==result} @conn.flush else raise "Connection invalid" end } rescue Exception=>e glog e,e.backtrace @valid=false end end def remotePort begin @conn.peeraddr[1] rescue end end def remoteHost begin @conn.peeraddr[3] rescue end end def disconnect(notifyRemote=true) puts "Disconnecting from #{@conn.addr} to #{@conn.peeraddr}" begin #self.send(:closeConnection) if notifyRemote #@mutex.synchronize { #puts "SYNC Disconnecting from #{@conn.addr} to #{@conn.peeraddr}" @valid=false @server.removeConnection(self) @conn.close #} rescue Object=>e glog e end end # pseudo-private def receiveLoop return unless @valid begin loop do curdata=@conn.recv(1012) @data+=curdata if curdata.length>0 pp "GOT DATA (at #{@conn.addr}) #{@data.length}" else puts "NO DATA" break end d=Secure::Data.new(@data) begin result=Secure::Marshal.load(d) # important: loop until buffer is empty, because it's possible that there are more than 1 messages in @data while result @server.receive(self,result) @data=d.str result=Secure::Marshal.load(d) end rescue Secure::Marshal::OutOfData=>e pp e,@data end break unless @valid end rescue Exception=>e #glog e,e.backtrace,self unless e.inspect.to_s=="#" pp e,e.backtrace,self end end @server.removeConnection(self) @valid=false end end class Server attr_reader :messages attr_reader :connections def initialize(port) @iserver=IServer.new(self,port,"127.0.0.1",40) @iserver.audit = true @iserver.start @connections=[] @messages=[] @threads=[] @hook=nil @hookConnected=nil @hookDisconnected=nil end def stop @iserver.stop @threads.each{|th|th.kill} end def hook(&b) @hook=b end def hookDisconnected(&b) @hookDisconnected=b end def host @iserver.host end def port @iserver.port end def receive(from,xy) #pp "RECVED:#{from}:#{xy}" if @xy==:closeConnection from.disconnect(false) elsif @hook #pp "calling hook" @hook.call(from,xy,Time.now) else #pp "pushing message" @messages<<[from,xy,Time.now] end end def connect(there,port) puts "#{self}:CONNECTING #{there} #{port}" host=Socket.gethostbyname(there) sock=TCPSocket.new(there,port) c=Connection.new(sock,self) @connections<e glog "Exception in connect-Thread " ,e end } c end def hookConnected(&hook) @hookConnected=hook end def eventConnected(con) glog "eventConnected #{con}" @hookConnected.call(con) if @hookConnected end def addConnection(con) glog "addConnection #{con}" assert{con.is_a?(Connection)} @connections<localPort,:host=>localHost},true)) end def method_missing(name,*args) if @server.remote_sync_func?(name) tx=@server.getTxId(self) call=Call.new(:call,name,args,tx) #puts "simplecon: sending #{call.inspect}" @c.send(call) # wait for @server to get return #puts "WAIT FOR RETURN OF #{name} #{args.inspect} in tx #{tx}" @server.getReturn(tx,self) elsif @server.remote_func?(name) call=Call.new(:call,name,args,nil) pp "CALL",call @c.send(call) else super end end def remoteHost return @remoteHost if @remoteHost @c.remoteHost end def remotePort return @remotePort if @remotePort @c.remotePort end def disconnect @c.disconnect end def connection @c end end RemoteExceptionMS=Struct.new(:embedded) class RemoteExceptionMS def initialize(emb=nil) self.embedded=[] if emb.is_a?(Exception) self.embedded=[emb.to_s,emb.backtrace.to_s] end end end class RemoteExceptione if msg.tx from.send(Call.new(:exception,name,RemoteExceptionMS.new(e),msg.tx)) end end if msg.tx if finished from.send(Call.new(:return,name,result,msg.tx)) else end end else glog "invalid access with #{msg} from #{from} at #{time}" end elsif msg.type==:return @whiteboard.put(msg.tx,[:return,msg.args]) if false @rtMutex.synchronize { #puts "adding return #{msg.tx} #{msg.tx.class} on #{self}" @returns[msg.tx]=msg.args ##pp @returns } end elsif msg.type==:exception @whiteboard.put(msg.tx,[:exception,msg.args]) #@rtMutex.synchronize { # @exceptions[msg.tx]=msg.args #} elsif msg.type==:hi con=checkCon(from) con.remotePort=msg.args[:port] con.remoteHost=msg.args[:host] eventConnected(con.remoteHost,con.remotePort) end end } @server.hookConnected {|con| # nothing ATM - let it say "hi" first (see above) } end def getReturn(tx,conn) result=@whiteboard.get(tx) case result[0] when :return return result[1] else raise RemoteException.new(result[1]) end loop do #puts "waiting for return...#{tx} #{tx.class} (on #{self})" @rtMutex.synchronize { if @returns.key?(tx) r=@returns[tx] @returns.delete(tx) return r end if @exceptions.key?(tx) r=@exceptions[tx] @exceptions.delete(tx) raise RemoteException.new(r) end } raise "Connection closed" unless conn.connection.valid sleep 0.1 end end def getTxId(conn) cid=nil @txMutex.synchronize { @txId+=1 cid=@txId @txs[cid]=conn } cid end def self.allow(*funcNames) @@allowed||=[] @@allowed+=funcNames end def self.remote(*funcNames) @@remote||=[] @@remote+=funcNames end def self.remote_sync(*funcNames) @@remote_sync||=[] @@remote_sync+=funcNames end def connections @server.connections.map{|c|checkCon(c)} end def connect(host,port) checkCon(@server.connect(host,port)) end def remote_func?(name) @@remote||=[] @@remote.member?(name) end def remote_sync_func?(name) @@remote_sync||=[] @@remote_sync.member?(name) end def localHost @server.host end def localPort @server.port end def stop @server.stop end def eventConnected(host,port) glog "eventConnected #{host},#{port}" # pure virtual end private def checkCon(con) assert{con.valid} @connections[con]||=SimpleConnection.new(self,con,localHost,localPort) @connections[con] end end end