Sha256: c520eaa3934629b224115ecfaf2c1bec4c8262d947143ef4f482c86278209af9

Contents?: true

Size: 1.61 KB

Versions: 1

Compression:

Stored size: 1.61 KB

Contents

module DCell
  # Servers handle incoming 0MQ traffic
  class Server
    include Celluloid::ZMQ

    # Bind to the given 0MQ address (in URL form ala tcp://host:port)
    def initialize
      # The gossip protocol is dependent on the node manager
      link Celluloid::Actor[:node_manager]

      @addr   = DCell.addr
      @socket = PullSocket.new

      begin
        @socket.bind(@addr)
      rescue IOError
        @socket.close
        raise
      end

      run!
    end

    # Wait for incoming 0MQ messages
    def run
      while true; handle_message! @socket.read; end
    end

    # Shut down the server
    def finalize
      @socket.close if @socket
    end

    # Handle incoming messages
    def handle_message(message)
      begin
        message = decode_message message
      rescue InvalidMessageError => ex
        Celluloid::Logger.warn("couldn't decode message: #{ex.class}: #{ex}")
        return
      end

      begin
        message.dispatch
      rescue => ex
        Celluloid::Logger.crash("DCell::Server: message dispatch failed", ex)
      end
    end

    class InvalidMessageError < StandardError; end # undecodable message

    # Decode incoming messages
    def decode_message(message)
      if message[0..1].unpack("CC") == [Marshal::MAJOR_VERSION, Marshal::MINOR_VERSION]
        begin
          Marshal.load message
        rescue => ex
          raise InvalidMessageError, "invalid message: #{ex}"
        end
      else raise InvalidMessageError, "couldn't determine message format: #{message}"
      end
    end

    # Terminate this server
    def terminate
      @socket.close
      super
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dcell-0.12.0.pre lib/dcell/server.rb