Sha256: 64f275ca070644fb102d483253dfd494907d5e8f583c1a1ecb685927f1251ad1
Contents?: true
Size: 2 KB
Versions: 1
Compression:
Stored size: 2 KB
Contents
module DCell # Servers handle incoming 0MQ traffic class Server include Celluloid::ZMQ # Bind to the given 0MQ address (in URL form ala tcp://host:port) def initialize @addr = DCell.addr @socket = DCell.zmq_context.socket(::ZMQ::PULL) unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(::ZMQ::LINGER, 0) @socket.close raise "couldn't set ZMQ::LINGER: #{::ZMQ::Util.error_string}" end unless ::ZMQ::Util.resultcode_ok? @socket.bind(@addr) @socket.close raise "couldn't bind to #{@addr}: #{::ZMQ::Util.error_string}" end run! end # Wait for incoming 0MQ messages def run while true wait_readable @socket message = '' rc = @socket.recv_string message if ::ZMQ::Util.resultcode_ok? rc handle_message message else raise "error receiving ZMQ string: #{::ZMQ::Util.error_string}" end end end # Shut down the server def finalize @socket.close end # Handle incoming messages def handle_message(message) begin message = decode_message message rescue InvalidMessageError => ex Celluloid::Logger.warn("couldn't decode message: #{ex.class}: #{ex}") return end begin message.dispatch rescue Exception => ex Celluloid::Logger.crash("DCell::Server: message dispatch failed", ex) end end class InvalidMessageError < StandardError; end # undecodable message # Decode incoming messages def decode_message(message) if message[0..1].unpack("CC") == [Marshal::MAJOR_VERSION, Marshal::MINOR_VERSION] begin Marshal.load message rescue => ex raise InvalidMessageError, "invalid message: #{ex}" end else raise InvalidMessageError, "couldn't determine message format: #{message}" end end # Terminate this server def terminate @socket.close super end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dcell-0.0.1 | lib/dcell/server.rb |