Sha256: 80a642dd44d513589f27f3df6b34679f0c3b9300d1bcf1b71bbd63fd31b5a2f8
Contents?: true
Size: 1.94 KB
Versions: 1
Compression:
Stored size: 1.94 KB
Contents
module Zeus class Server class AcceptorRegistrationMonitor def datasource ; @sock ; end def on_datasource_event ; handle_message ; end # @__CHILD__sock is not closed here, as it's used by the master to respond # on behalf of unbooted acceptors def close_child_socket ; end def close_parent_socket ; @sock.close ; end def initialize @sock, @__CHILD__sock = UNIXSocket.pair @acceptors = [] @pings = {} end AcceptorStub = Struct.new(:pid, :socket, :commands, :description) def handle_message io = @sock.recv_io data = JSON.parse(io.readline.chomp) type = data['type'] case type when 'wait' ; handle_wait(io, data) when 'registration' ; handle_registration(io, data) else raise "invalid message" end end def handle_wait(io, data) command = data['command'].to_s @pings[command] ||= [] @pings[command] << io end def handle_registration(io, data) pid = data['pid'].to_i commands = data['commands'] description = data['description'] @acceptors.reject!{|ac|ac.commands == commands} @acceptors << AcceptorStub.new(pid, io, commands, description) notify_pings_for_commands(commands) end def notify_pings_for_commands(commands) (commands || []).each do |command| (@pings[command.to_s] || []).each do |ping| ping.puts "ready\n" ping.close end @pings[command.to_s] = nil end end module ChildProcessApi def __CHILD__find_acceptor_for_command(command) @acceptors.detect { |acceptor| acceptor.commands.include?(command) } end def __CHILD__register_acceptor(io) @__CHILD__sock.send_io(io) end end ; include ChildProcessApi end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
zeus-0.4.6 | lib/zeus/server/acceptor_registration_monitor.rb |