Uses the fucked up Ruby socket API (which isn’t really an API) to listen for presence broadcast from peers. Processing of the inbound data is quickly delegated to a background thread to allow the listener to continue responding to inbound traffic as fast as possible.
# File lib/journeta/presence_listener.rb, line 15 15: def go 16: presence_address = @engine.presence_address 17: port = @engine.presence_port 18: addresses = IPAddr.new(presence_address).hton + IPAddr.new("0.0.0.0").hton 19: begin 20: socket = UDPSocket.new 21: # Remember how i said this was fucked up? yeaahhhhhh. i hope you like C. 22: # `man setsockopt` for details. 23: # 24: # The PLATFORM constant got changed to RUBY_PLATFORM in the 1.9 MRI, so we have to handle both cases. :( 25: # Also note that jruby 1.3.1 uses PLATFORM. 26: if (defined?(PLATFORM) && PLATFORM.match(/linux/)) || (defined?(RUBY_PLATFORM) && RUBY_PLATFORM.match(/linux/)) 27: socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, [1].pack("i_") ) 28: elsif (defined?(PLATFORM) && PLATFORM.match(/java/)) || (defined?(RUBY_PLATFORM) && RUBY_PLATFORM.match(/java/)) 29: socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, [1].pack("i_") ) # TODO test this jruby case! 30: else 31: # SO_REUSEPORT is needed so multiple peers can be run on the same machine. 32: # socket.setsockopt(Socket::IPPROTO_IP, Socket::IP_TTL, [1].pack('i')) # Preston's original config for OS X. 33: socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, [1].pack("i_") ) # Remi's suggested default. 34: end 35: socket.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, addresses) 36: socket.bind(Socket::INADDR_ANY, port) 37: putsd "Waiting for presence events." 38: loop do 39: # Why 1024? umm.. because it's Thursday! 40: data, meta = socket.recvfrom 1024 41: Thread.new(data, meta) do |data, meta| 42: event = YAML.load(data) 43: if event.uuid != @engine.uuid 44: begin 45: m = YAML::load(data) 46: peer = PeerConnection.new @engine 47: # Why is this always [2]? Not sure.. they should have returned a hash instead. 48: peer.ip_address = meta[2] 49: peer.peer_port = m.peer_port 50: peer.uuid = m.uuid 51: peer.version = m.version 52: peer.groups = m.groups 53: peer.created_at = peer.updated_at = Time.now 54: 55: # We should not start the #PeerConnection before registering because a running 56: # PeerConnection might already be registered. In this case, we'd have wasted a thread, 57: # so we'll let the registry handle startup (if it happens at all.) 58: # 59: # peer.start 60: 61: # TODO validate peer entry is sane before registering it 62: @engine.register_peer peer 63: rescue => e 64: putsd "Error during peer registration: #{e.message}" 65: end 66: end 67: end 68: # putsd "Event received!" 69: end 70: ensure 71: putsd "Closing presence listener socket." 72: socket.close 73: end 74: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.