require 'udp_transceiver.rb' require 'simple_api.rb' require 'big_transfer.rb' require 'message_queue.rb' require 'qos_queue.rb' class UdpNetworkApptimestamp && n.reachable!=false}.map{|n|n.node} end def nodesOlderThan(timestamp) @nodes.values.select{|n|n.timestamp<=timestamp && n.reachable!=false}.map{|n|n.node} end def nodes @nodes.values.select{|n|n.reachable!=false}.map{|n|n.node} end end def initialize(cluster,options) super(cluster,options,:fullNetworkNode) @port=9876 @api=nil loop do begin @transceiver=UdpTransceiver.new("127.0.0.1",@port) break rescue @port+=1 end end discovery=getApp(:networkDiscovery) if discovery discovery.add(self.me) end @services=[] @nodes=NodeStore.new @receivers={} log "UDPINIT" @protocolAPIs=[] addReceivers end def me @transceiver.me end def knownNodes @nodes.nodesNewerThan(0) end def run loop do gatherNodes spreadNodes sleep SPREADING_INTERVAL break if @quit end @quit=:ok end def sendMessage(message,node) @transceiver.send(node,message) end def addReceiver(receiver,&block) @receivers[receiver]=block end def removeReceiver(receiver) @receivers.delete(receiver) end def api(priority=nil) return @api if @api layers=[QoSQueue,BigTransfer,RemoteSimpleAPI] layers=[BigTransfer,RemoteSimpleAPI] api=self layers.each{|layer| @protocolAPIs << api=layer.new(api) } @api=api #messageQueue=MessageQueue.new(self,priority) #messageQueue=QoSQueue.new(self) #bigTransfer=BigTransfer.new(messageQueue) #@api||=RemoteSimpleAPI.new(bigTransfer) @api.hook('_networkNodes'){|nodes| nodes.each{|n|@nodes.add(n)} } #@protocolAPIs << messageQueue #@protocolAPIs << bigTransfer #@protocolAPIs << @api @api end protect :api def stop super log "UDP Alog successfully stopping #{@protocolAPIs.length}" log caller @quit=true log @protocolAPIs.length log "=========" @protocolAPIs.each{|api| api.stop } @transceiver.stop log "UDP Alog successfully stopped" end private def spreadNodes @lastSpread||=0 time=Time.now.to_f maxSend=10 newer=@nodes.nodesNewerThan(@lastSpread) older=@nodes.nodesOlderThan(@lastSpread) newer.each{|target| api.sendAsync(target,'_networkNodes',@nodes.nodes.shuffle[0...maxSend]) } older.each{|target| api.sendAsync(target,'_networkNodes',newer.shuffle[0...maxSend]) } @lastSpread=time end def gatherNodes discovery=getApp(:networkDiscovery) nodes=[] if discovery nodes=discovery.get.select{|str| str=~/udpsec/ } end nodes.each{|n|@nodes.add(n)} end def addReceivers @transceiver.receive{|from,message| @receivers.each{|rec,block|block.call(from,message)} } end end