Methods

Files

Journeta::PresenceListener

Uses the fucked up Ruby socket API (which isn’t really an API) to listen for presence broadcast from peers. Processing of the inbound data is quickly delegated to a background thread to allow the listener to continue responding to inbound traffic as fast as possible.

Public Instance Methods

go() click to toggle source
    # File lib/journeta/presence_listener.rb, line 15
15:     def go
16:       presence_address = @engine.presence_address
17:       port = @engine.presence_port
18:       addresses =  IPAddr.new(presence_address).hton + IPAddr.new("0.0.0.0").hton
19:       begin
20:         socket = UDPSocket.new
21:         # Remember how i said this was fucked up? yeaahhhhhh. i hope you like C.
22:         # `man setsockopt` for details.
23:         #
24:         # The PLATFORM constant got changed to RUBY_PLATFORM in the 1.9 MRI, so we have to handle both cases. :(
25:         # Also note that jruby 1.3.1 uses PLATFORM.
26:         if (defined?(PLATFORM) && PLATFORM.match(/linux/)) || (defined?(RUBY_PLATFORM) && RUBY_PLATFORM.match(/linux/))
27:           socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, [1].pack("i_") )
28:         elsif (defined?(PLATFORM) && PLATFORM.match(/java/)) || (defined?(RUBY_PLATFORM) && RUBY_PLATFORM.match(/java/))
29:           socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, [1].pack("i_") ) # TODO test this jruby case!
30:         else
31:           # SO_REUSEPORT is needed so multiple peers can be run on the same machine.
32:           # socket.setsockopt(Socket::IPPROTO_IP, Socket::IP_TTL, [1].pack('i')) # Preston's original config for OS X.
33:           socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, [1].pack("i_") ) # Remi's suggested default.
34:         end        
35:         socket.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, addresses)
36:         socket.bind(Socket::INADDR_ANY, port)
37:         putsd "Waiting for presence events."
38:         loop do
39:           # Why 1024? umm.. because it's Thursday!
40:           data, meta = socket.recvfrom 1024
41:           Thread.new(data, meta) do |data, meta|
42:             event = YAML.load(data)
43:             if event.uuid != @engine.uuid
44:               begin
45:                 m = YAML::load(data)
46:                 peer = PeerConnection.new @engine
47:                 # Why is this always [2]? Not sure.. they should have returned a hash instead.
48:                 peer.ip_address = meta[2]
49:                 peer.peer_port = m.peer_port
50:                 peer.uuid = m.uuid
51:                 peer.version = m.version
52:                 peer.groups = m.groups
53:                 peer.created_at = peer.updated_at = Time.now
54:               
55:                 # We should not start the #PeerConnection before registering because a running
56:                 # PeerConnection might already be registered. In this case, we'd have wasted a thread,
57:                 # so we'll let the registry handle startup (if it happens at all.)
58:                 #
59:                 # peer.start
60:               
61:                 # TODO validate peer entry is sane before registering it
62:                 @engine.register_peer peer
63:               rescue => e
64:                 putsd "Error during peer registration: #{e.message}"
65:               end
66:             end
67:           end
68:           # putsd "Event received!"
69:         end
70:       ensure
71:         putsd "Closing presence listener socket."
72:         socket.close
73:       end
74:     end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.