lib/ruby_skynet/server.rb in ruby_skynet-0.3.0 vs lib/ruby_skynet/server.rb in ruby_skynet-0.4.0.pre

- old
+ new

@@ -18,10 +18,23 @@ @@hostname = nil @@port = 2000 @@region = 'Development' @@server = nil + def self.start + @@server ||= supervise(hostname, port) + end + + def self.stop + @@server.terminate if @@server + @@server = nil + end + + def self.running? + (@@server != nil) && @@server.running? + end + # Region under which to register Skynet services # Default: 'Development' def self.region @@region end @@ -71,27 +84,23 @@ doozer.delete(klass.service_key) rescue nil end @@services.delete(klass.service_name) end - # Returns whether the server is running - def self.running? - (@@server != nil) && @@server.running? - end + # Start the server so that it can start taking RPC calls + # Returns false if the server is already running + def initialize(host, port) + # Since we included Celluloid::IO, we're actually making a + # Celluloid::IO::TCPServer here + # TODO If port is in use, try the next port in sequence + @server = TCPServer.new(host, port) + async.run - # Start the Server - def self.start - @@server = new - @@server.start + # Register services hosted by this server + self.class.services.each_pair {|key, klass| self.class.register_service_in_doozer(klass)} end - # Stop the Server - def self.stop - @@server.terminate - @@server = nil - end - def finalize @server.close if @server logger.info "Skynet Server Stopped" # Deregister services hosted by this server @@ -101,94 +110,32 @@ end end logger.info "Skynet Services De-registered in Doozer" end - # Returns whether the server is running - def running? - (@server != nil) && !@server.closed? - end - - ############################################################################ - protected - - attr_accessor :server - - # Register the supplied service in doozer - def self.register_service_in_doozer(klass) - config = { - "Config" => { - "UUID" => "#{Server.hostname}:#{Server.port}-#{$$}-#{klass.name}-#{klass.object_id}", - "Name" => klass.service_name, - "Version" => klass.service_version.to_s, - "Region" => Server.region, - "ServiceAddr" => { - "IPAddress" => Server.hostname, - "Port" => Server.port, - "MaxPort" => Server.port + 999 - }, - }, - "Registered" => true - } - RubySkynet::Registry.doozer_pool.with_connection do |doozer| - doozer[klass.service_key] = MultiJson.encode(config) - end - end - - # Start the server so that it can start taking RPC calls - # Returns false if the server is already running - def start - return false if running? - - # Since we included Celluloid::IO, we're actually making a - # Celluloid::IO::TCPServer here - # TODO If port is in use, try the next port in sequence - # TODO make port to listen on configurable - @server = TCPServer.new('0.0.0.0', self.class.port) - run! - - # Register services hosted by this server - self.class.services.each_pair {|key, klass| self.class.register_service_in_doozer(klass)} - true - end - def run logger.info("Starting listener on #{self.class.hostname}:#{self.class.port}") loop do logger.debug "Waiting for a client to connect" begin - handle_connection!(@server.accept) + async.handle_connection(@server.accept) rescue Exception => exc logger.error "Exception while processing connection request", exc end end end - # Called for each message received from the client - # Returns a Hash that is sent back to the caller - def on_message(service_name, method, params) - logger.benchmark_debug "Called: #{service_name}##{method}" do - logger.trace "Method Call: #{method} with parameters:", params - klass = Server.services[service_name] - raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass - service = klass.new - raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method) - # TODO Use pool of services, or Celluloid here - service.send(method, params) - end - end - # Called for each client connection def handle_connection(client) logger.debug "Client connected, waiting for data from client" # Process handshake handshake = { 'registered' => true, 'clientid' => BSON::ObjectId.new.to_s } - client.write(BSON.serialize(handshake)) + client.write(BSON.serialize(handshake).to_s) Common.read_bson_document(client) while(header = Common.read_bson_document(client)) do logger.debug "\n******************" logger.debug "Received Request" @@ -214,22 +161,65 @@ end if reply logger.debug "Sending Header" # For this test we just send back the received header - client.write(BSON.serialize(header)) + client.write(BSON.serialize(header).to_s) logger.debug "Sending Reply" logger.trace 'Reply', reply - client.write(BSON.serialize({'out' => BSON.serialize(reply).to_s})) + client.write(BSON.serialize({'out' => BSON.serialize(reply).to_s}).to_s) else logger.debug "Closing client since no reply is being sent back" break end end # Disconnect from the client client.close logger.debug "Disconnected from the client" + end + + # Returns whether the server is running + def running? + (@server != nil) && !@server.closed? + end + + ############################################################################ + protected + + # Register the supplied service in doozer + def self.register_service_in_doozer(klass) + config = { + "Config" => { + "UUID" => "#{Server.hostname}:#{Server.port}-#{$$}-#{klass.name}-#{klass.object_id}", + "Name" => klass.service_name, + "Version" => klass.service_version.to_s, + "Region" => Server.region, + "ServiceAddr" => { + "IPAddress" => Server.hostname, + "Port" => Server.port, + "MaxPort" => Server.port + 999 + }, + }, + "Registered" => true + } + RubySkynet::Registry.doozer_pool.with_connection do |doozer| + doozer[klass.service_key] = MultiJson.encode(config) + end + end + + # Called for each message received from the client + # Returns a Hash that is sent back to the caller + def on_message(service_name, method, params) + logger.benchmark_debug "Called: #{service_name}##{method}" do + logger.trace "Method Call: #{method} with parameters:", params + klass = Server.services[service_name] + raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass + service = klass.new + raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method) + # TODO Use pool of services, or Celluloid here + service.send(method, params) + end end end end