Sha256: b191d484f6a421d058a95adb11d2130367b765661c886f7d6b2e055f870681c0
Contents?: true
Size: 1.8 KB
Versions: 1
Compression:
Stored size: 1.8 KB
Contents
# FIXME: maybe include check for package lost class BigTransfer def initialize(node) @node=node @receivers={} @mutex=Mutex.new @recMutex=Mutex.new @pkgId=0 @node.addReceiver(self) {|from,args| receive(from,args) } @packages={} end def me @node.me end def addReceiver(api,&block) @receivers[api]=block end def removeReceiver(app) @receivers.delete(app) end def log(*x) @node.log(*x) end def stop end # close the api-object - api won't listen anymore def close @node.removeReceiver(self) end def receive(from,args) pkgId=nil pkgs=nil @mutex.synchronize { assert{args.is_a?(Array)} pkgId,str,i,pkgs=args @packages[pkgId]||=[] @packages[pkgId]<<[str,i,pkgs] } if @packages[pkgId].length>=pkgs @recMutex.synchronize { ps=@packages[pkgId].clone ps=ps.uniq{|p|p[1]} if ps.length==pkgs # ps is mad unique - so if length is ok, data should be ok data=ps.sort{|a,b| a[1].to_i <=> b[1].to_i}.map{|p|p[0]}.join("") @receivers.each{|k,v|v.call(from,data)} @packages.delete(pkgId) else puts "PKG recv-COUNT #{ps.length} #{ps.map{|p|p[1]}}" end } end end def sendMessage(data,node) assert{data.is_a?(String)} maxSize=UdpTransceiver::UDPSize-UdpTransceiver::Overhead pkgs=(data.length.to_f/maxSize).ceil i=0 pkgId=getNewPkgId while data and data.length>0 str=data[0...maxSize] data=data[maxSize..-1] pkg=[pkgId,str,i,pkgs] i+=1 @node.sendMessage(pkg,node) end end private def getNewPkgId id=nil @mutex.synchronize { @pkgId+=1 id=@pkgId } id end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
appswarm-0.0.1 | apps/udp_network/big_transfer.rb |