require 'secure_marshal.rb' require 'tools/waiting_queue.rb' class UdpTransceiver UDPSize=4096 Overhead=400 Message=Struct.new(:type, :counter, :content) class Message def self.ack(counter) self.new(:ack,counter,nil) end def self.request(counter) self.new(:request,counter,nil) end def self.create(counter,content) self.new(:pkg,counter,content) end end Packet=Struct.new(:from, :to, :message) class Stream def initialize(connection) @connection=connection end def write(s) len=@connection.maxLength-1 len1=len-1 while s.length>0 if s.length>len x=s[0..len1] s=s[len..-1] @connection.send(x) else @connection.send(s) s="" end end end def read s="" loop do tmp=@connection.read if tmp.nil? break else s+=tmp end end s end end class Connection def initialize(node,transceiver) @node=node @transceiver=transceiver @mutex=Mutex.new @queue=[] # FIXME: add event api end def stream Stream.new(self) end def matches(host,port) pp @node,[host,port] @node==[host,port] end def maxLength UDPSize-Overhead end def send(content) if content.length>maxLength raise "#{content} is too long" end @transceiver.send(@node,content) end def read @mutex.synchronize { @queue.shift } end def close @transceiver.close(self) end def enqueue(packet) @mutex.synchronize { pp "ENQ:",packet @queue << packet } end end def initialize(bind,port) @port=port @bind=bind @socket=UDPSocket.open @socket.bind(@bind,@port) @inQueue=WaitingQueue.new @outQueue=WaitingQueue.new @outCounter={} @inCounter={} @outCounterMutex=Mutex.new @inCounterMutex=Mutex.new @connsMutex=Mutex.new @connections=[] @quit=false @receivers=[] @threads=[] @threads << Thread.new{ begin recvLoop rescue Object=>e pp "RECV:",e,e.backtrace end } # process ingthread @threads << Thread.new{ begin processInQueue rescue Object=>e pp "PROCESS IN QUEUE",e,e.backtrace end } # sending thread @threads << Thread.new{ begin send! rescue Object=>e pp "SEND",e,e.backtrace end } end def close! Thread.critical=true @socket.close @socket=nil Thread.critical=false end def close(conn) @connsMutex.synchronize { @connections.delete(conn) } end def connect(node) c=Connection.new(nodeOrtho(node),self) @connsMutex.synchronize { @connections << c } c end def me "udpsec://#{@bind}:#{@port}" end def send(node,content) node=nodeOrtho(node) @outQueue << Packet.new(me,node,Message.create(counterInc(node),content)) end def receive(&block) @receivers << block end def stop @quit=true @threads.each{|th|th.kill} @socket.close end private def pair2address(a=nil) a=["127.0.0.1",a[1]] if a[0]=="::1" a||=myAddress "udpsec://"+a.join(":") end def myAddress [@bind,@port] end def nodeOrtho(node) if node.is_a?(String) assert{node=~/udpsec/} ar=node.gsub(/.*\//,"").split(":") port=ar[-1] host=ar[0..-2].join(":") node=[host,port.to_i] end [IPSocket::getaddress(node[0]),node[1]] end def send! while not quit? o=@outQueue.pop str=Secure::Marshal::dump(o.message) host=o.to[0] port=o.to[1] #soc=UDPSocket.open soc=@socket host="localhost" if host=="::1" begin #pp "SEND #{str.length} to #{host.inspect} #{port.inspect}" soc.send(str,0,host,port) rescue Object=>e pp e,e.backtrace end #measureTime { sleep 0.001 # wait a little time to let get udp packets get through #} end end def counterInc(node) @outCounterMutex.synchronize { @outCounter[node]||=0 @outCounter[node]+=1 } end def recvLoop while not quit? #puts "select..." IO.select([@socket]) r=@socket.recvfrom_nonblock(UDPSize) #pp r push(r) end end def push(r) #@inQueueMutex.synchronize { @inQueue << r #} #processInQueue! end private def quit? @quit end def processInQueue missed=[] while not quit? i=@inQueue.pop conns=nil @connsMutex.synchronize { conns=@connections } content,from=i content=Secure::Marshal::load(content) if content.type==:pkg node=nodeOrtho(from[1..2].reverse) action=nil matches=false # check if this is the next packet @inCounterMutex.synchronize { @inCounter[node]||=1 missed.delete([node,content.counter]) if @inCounter[node]==content.counter action=:take @inCounter[node]+=1 matches=true elsif @inCounter[node]>content.counter action=:ignore matches=true # ignore else action=:missed # missed a packet ? missed << [node,@inCounter[node]] end } if action==:take conns.each{|c| if c.matches(*node) c.enqueue(content.content) end } @receivers.each{|r|r.call(pair2address(node),content.content)} end end # FIXME: add fetching rest (?) end end end