Sha256: 8355dc02698ceefe4d5a58fa72ca62fe54d03cf5d7ce3c5c2dae8175ba55c6f2

Contents?: true

Size: 1.84 KB

Versions: 1

Compression:

Stored size: 1.84 KB

Contents

module Zeus
  class Server
    class AcceptorRegistrationMonitor

      def datasource          ; @reg_monitor ; end
      def on_datasource_event ; handle_message ; end

      def initialize
        @reg_monitor, @reg_acceptor = UNIXSocket.pair
        @acceptors = []
        @pings = {}
      end

      AcceptorStub = Struct.new(:pid, :socket, :commands, :description)

      def handle_message
        io = @reg_monitor.recv_io

        data = JSON.parse(io.readline.chomp)
        type = data['type']

        case type
        when 'wait' ; handle_wait(io, data)
        when 'registration' ; handle_registration(io, data)
        when 'deregistration' ; handle_deregistration(io, data)
        else raise "invalid message"
        end
      end

      def handle_wait(io, data)
        command = data['command'].to_s
        @pings[command] ||= []
        @pings[command] << io
      end

      def handle_deregistration(io, data)
        pid = data['pid'].to_i
        @acceptors.reject!{|acc|acc.pid == pid}
      end

      def handle_registration(io, data)
        pid         = data['pid'].to_i
        commands    = data['commands']
        description = data['description']

        @acceptors.reject!{|ac|ac.commands == commands}
        @acceptors << AcceptorStub.new(pid, io, commands, description)
        notify_pings_for_commands(commands)
      end

      def notify_pings_for_commands(commands)
        (commands || []).each do |command|
          (@pings[command.to_s] || []).each do |ping|
            ping.puts "ready\n"
            ping.close
          end
          @pings[command.to_s] = nil
        end
      end

      def find_acceptor_for_command(command)
        @acceptors.detect { |acceptor|
          acceptor.commands.include?(command)
        }
      end

      def acceptor_registration_socket
        @reg_acceptor
      end

    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
zeus-0.2.4 lib/zeus/server/acceptor_registration_monitor.rb