require 'network_interface_api.rb' class QoSQueuee pp e,e.backtrace end } @sendThread[:type]=:sendThread @missSendThread=Thread.new { begin missSend! rescue Object=>e pp e,e.backtrace end } @missSendThread[:type]=:sendMissThread end def stop puts "STOPPING QOS" @quit=true sleep 0.1 @sendThread.kill @missSendThread.kill if @sendThread.alive? sleep 0.1 @sendThread.kill! end if @missSendThread.alive? sleep 0.1 @missSendThread.kill! puts "HARD KILL" end end def sendMessage(msg) @queue<MAX_ACK_WAIT || @completeReceived-@lastAckNumberSend>MAX_ACK_WAIT_QUEUE sendAck end receivers.each{|k,v|v.call(@targetNode,data)} end } return nil end private def quit? @quit end def sendAck @myNode.sendMessage([@completeReceived,PKG_ACK,nil],@targetNode) @lastSentAckTime=Time.now @lastAckNumberSend=@completeReceived puts "SENDING ACK #{@completeReceived} --- #{@lastReceived}" end def send! while not quit? data=@queue.pop while @sendId-@lastRecvAck>MAX_ACK_WAIT_QUEUE puts "ENQUEUE (waiting: #{@queue.length}" sleep 0.2 # wait end @sendMutex.synchronize { @sendId+=1 @myNode.sendMessage([@sendId,PKG_NORMAL,data],@targetNode) puts "QoSSEND #{@sendId}" @missBuffer<<[@sendId,data] } end end def missSend! while not quit? if @sendId-@lastRecvAck>=MAX_ACK_WAIT_QUEUE @sendMutex.synchronize { # resend @missBuffer=@missBuffer.select{|b|b[0]>@lastRecvAck}.sort_by{|b|b[0]} } @missBuffer[0...SEND_PKGS].each{|b| id,data=b puts "RESEND #{id}" @myNode.sendMessage([id,PKG_NORMAL,data],@targetNode) } end sleep WAIT_TIME end end end def initialize(node) super @mutex=Mutex.new @conns={} @quit=false end def conn(from) @mutex.synchronize { return nil if @quit @conns[from]||=Connection.new(@node,from) } end def stop puts "TRYING TO STOP QOS_QUEUE" @mutex.synchronize { puts "QOS_QUEUE stop #{@conns.length}" @conns.each{|k,v| puts "STOP:",v v.stop } @quit=true } end def receive(from,args) c=conn(from) return c.recv(args,@receivers) if c nil end def sendMessage(data,node) c=conn(node) return c.sendMessage(data) if c nil end end