Sha256: 8355dc02698ceefe4d5a58fa72ca62fe54d03cf5d7ce3c5c2dae8175ba55c6f2
Contents?: true
Size: 1.84 KB
Versions: 1
Compression:
Stored size: 1.84 KB
Contents
module Zeus class Server class AcceptorRegistrationMonitor def datasource ; @reg_monitor ; end def on_datasource_event ; handle_message ; end def initialize @reg_monitor, @reg_acceptor = UNIXSocket.pair @acceptors = [] @pings = {} end AcceptorStub = Struct.new(:pid, :socket, :commands, :description) def handle_message io = @reg_monitor.recv_io data = JSON.parse(io.readline.chomp) type = data['type'] case type when 'wait' ; handle_wait(io, data) when 'registration' ; handle_registration(io, data) when 'deregistration' ; handle_deregistration(io, data) else raise "invalid message" end end def handle_wait(io, data) command = data['command'].to_s @pings[command] ||= [] @pings[command] << io end def handle_deregistration(io, data) pid = data['pid'].to_i @acceptors.reject!{|acc|acc.pid == pid} end def handle_registration(io, data) pid = data['pid'].to_i commands = data['commands'] description = data['description'] @acceptors.reject!{|ac|ac.commands == commands} @acceptors << AcceptorStub.new(pid, io, commands, description) notify_pings_for_commands(commands) end def notify_pings_for_commands(commands) (commands || []).each do |command| (@pings[command.to_s] || []).each do |ping| ping.puts "ready\n" ping.close end @pings[command.to_s] = nil end end def find_acceptor_for_command(command) @acceptors.detect { |acceptor| acceptor.commands.include?(command) } end def acceptor_registration_socket @reg_acceptor end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
zeus-0.2.4 | lib/zeus/server/acceptor_registration_monitor.rb |