Responsible for keeping in-memory metadata on known peers in the form of PeerConnections.
# File lib/journeta/peer_registry.rb, line 58 58: def all(all_groups = false) 59: r = nil 60: @mutex.synchronize do 61: r = all_do(all_groups) 62: end 63: return r 64: end
# File lib/journeta/peer_registry.rb, line 31 31: def clear 32: @mutex.synchronize do 33: h.clear 34: end 35: end
# File lib/journeta/peer_registry.rb, line 37 37: def go 38: loop do 39: @mutex.synchronize do 40: to_reap = [] 41: @peers.each do |uuid, peer| 42: tolerance = Time.now - @reaper_tolerance 43: if peer.updated_at < tolerance 44: to_reap << peer 45: end 46: end 47: to_reap.each do |peer| 48: peer.stop 49: @peers.delete peer.uuid 50: send_peer_unregistered(peer) 51: end 52: end 53: sleep @reaper_period 54: end 55: end
Adds a PeerConnection to the registry. It is optional but not necessary to call +PeerConnection#start+ before manually adding it to the registry! If a peer with the same UUID is already registered, the given peer will be stopped and the existing one updated.
# File lib/journeta/peer_registry.rb, line 68 68: def register(peer) 69: raise "Do not try to register a nil peer!" if peer.nil? 70: raise "You can only add #{PeerConnection} instances to this registry, not #{peer.class}!" unless peer.class == PeerConnection 71: @mutex.synchronize do 72: # pp @peers 73: existing = @peers[peer.uuid] 74: if existing.nil? 75: putsd "Adding peer #{peer.uuid}." 76: peer.start 77: @peers[peer.uuid] = peer 78: send_peer_registered(peer) 79: else 80: putsd "Updating peer #{peer.uuid}." 81: peer.stop 82: # Make sure we're not overriding the creation date of the original entry. 83: peer.created_at = nil 84: existing.update_settings peer 85: send_peer_updated(existing) 86: end 87: 88: end 89: end
# File lib/journeta/peer_registry.rb, line 137 137: def send_peer_registered(peer) 138: return if peer.nil? 139: handler = @engine.peer_registered_handler 140: Thread.new(handler, peer) {|h, peer| h.call(peer)} 141: end
# File lib/journeta/peer_registry.rb, line 149 149: def send_peer_unregistered(peer) 150: return if peer.nil? 151: handler = @engine.peer_unregistered_handler 152: Thread.new(handler, peer) {|h, peer| h.call(peer)} 153: end
# File lib/journeta/peer_registry.rb, line 143 143: def send_peer_updated(peer) 144: return if peer.nil? 145: handler = @engine.peer_updated_handler 146: Thread.new(handler, peer) {|h, peer| h.call(peer)} 147: end
# File lib/journeta/peer_registry.rb, line 102 102: def send_to_known_peers(payload) 103: # Iterate over each currently known peer and stuff the payload into each peers outgoing data queue. 104: @mutex.synchronize do 105: # Grab all peers in relevant groups. 106: group = all_do 107: n= group.length 108: if n > 0 109: putsd "Sending payload to #{n} peers." 110: group.each do |uuid, conn| 111: conn.send_payload payload 112: end 113: else 114: putsd 'No peers (in relevant groups) to send message to!' 115: end 116: end 117: end
# File lib/journeta/peer_registry.rb, line 128 128: def send_to_peer(uuid, payload) 129: @mutex.synchronize do 130: p = @peers[uuid] 131: if p 132: p.send_payload(payload) 133: end 134: end 135: end
Removes a PeerConnection from the registry. If the peer is still broadcasting presence, it will magically become reregistered at some point!
# File lib/journeta/peer_registry.rb, line 92 92: def unregister(peer) 93: return unless !peer.nil? and peer.class == PeerConnection 94: @mutex.synchronize do 95: peer.stop 96: if peer.uuid 97: @peers.delete peer.uuid 98: end 99: end 100: end
# File lib/journeta/peer_registry.rb, line 157 157: def all_do(all_groups = false) 158: res = nil 159: if all_groups 160: # Create a new structure to avoid corruption of the original. 161: res = Hash.new.update @peers 162: else 163: res = Hash.new 164: @peers.each do |uuid, peer| 165: if peer.groups 166: peer.groups.each do |g| 167: if @engine.groups.nil? || @engine.groups.include?(g) 168: res[uuid] = peer 169: end 170: end 171: else 172: # Peer is a member of all groups. 173: res[uuid] = peer 174: end 175: end 176: end 177: return res 178: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.