lib/ruby_skynet/server.rb in ruby_skynet-0.4.0.pre2 vs lib/ruby_skynet/server.rb in ruby_skynet-0.4.0

- old
+ new

@@ -1,126 +1,114 @@ require 'bson' -require 'celluloid/io' -# Replace Celluloid logger immediately upon loading the Server Instance -Celluloid.logger = SemanticLogger::Logger.new('Celluloid') - # # RubySkynet Server # # Hosts one or more Skynet Services # module RubySkynet class Server - include Celluloid::IO include SemanticLogger::Loggable - # TODO Make Server instance based rather than class based. Then make instance global - @@hostname = nil - @@port = 2000 - @@region = 'Development' @@server = nil + @@services = ThreadSafe::Hash.new - def self.start - @@server ||= supervise(hostname, port) + # Start a single instance of the server + def self.start(region = 'Development', start_port = 2000, hostname = nil) + @@server ||= new(region, start_port, hostname) end + # Stop the single instance of the server def self.stop - @@server.terminate if @@server + @@server.finalize if @@server @@server = nil end + # Is the single instance of the server running def self.running? (@@server != nil) && @@server.running? end - # Region under which to register Skynet services - # Default: 'Development' - def self.region - @@region - end - - def self.region=(region) - @@region = region - end - - # Port to listen to requests on - # Default: 2000 - def self.port - @@port - end - - def self.port=(port) - @@port = port - end - - # Override the hostname at which this server is running - # Useful when the service is behind a firewall or NAT device - def self.hostname=(hostname) - @@hostname = hostname - end - - # Returns [String] hostname of the current server - def self.hostname - @@hostname ||= Socket.gethostname - end - - @@services = ThreadSafe::Hash.new - # Services currently loaded and available at this server when running def self.services @@services end - # Registers a Service Class as being available at this port + # Registers a Service Class as being available at this host and port def self.register_service(klass) - logger.debug "Registering Service: #{klass.name} with name: #{klass.service_name}" + raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:service_name) && klass.respond_to?(:service_version) + + if previous_klass = @@services[klass.service_name] && (previous_klass.name != klass.name) + logger.warn("Service with name: #{klass.service_name} is already registered to a different implementation:#{previous_klass.name}") + end @@services[klass.service_name] = klass - register_service_in_doozer(klass) if running? + @@server.register_service(klass) if @@server end - # De-register service in doozer + # De-register service def self.deregister_service(klass) - RubySkynet::Registry.doozer_pool.with_connection do |doozer| - doozer.delete(klass.service_key) rescue nil - end + raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:service_name) && klass.respond_to?(:service_version) + + @@server.deregister_service(klass) if @@server @@services.delete(klass.service_name) end + # The actual port the server is running at which will be different + # from Server.port if that port was already in use at startup + attr_reader :hostname, :port, :region + # Start the server so that it can start taking RPC calls # Returns false if the server is already running - def initialize(host, port) - # Since we included Celluloid::IO, we're actually making a - # Celluloid::IO::TCPServer here - # TODO If port is in use, try the next port in sequence - @server = TCPServer.new(host, port) - async.run + def initialize(region = 'Development', start_port = 2000, hostname = nil) + hostname ||= Common.local_ip_address + # If port is in use, try the next port in sequence + port_count = 0 + begin + @server = ::TCPServer.new(hostname, start_port + port_count) + @hostname = hostname + @port = start_port + port_count + @region = region + rescue Errno::EADDRINUSE => exc + if port_count < 999 + port_count += 1 + retry + end + raise exc + end + + # Start Server listener thread + Thread.new { run } + # Register services hosted by this server - self.class.services.each_pair {|key, klass| self.class.register_service_in_doozer(klass)} + self.class.services.each_value {|klass| register_service(klass)} end def finalize @server.close if @server logger.info "Skynet Server Stopped" # Deregister services hosted by this server - RubySkynet::Registry.doozer_pool.with_connection do |doozer| - self.class.services.each_value do |klass| - doozer.delete(klass.service_key) rescue nil - end + self.class.services.each_value do |klass| + deregister_service(klass) rescue nil end - logger.info "Skynet Services De-registered in Doozer" + logger.info "Skynet Services De-registered" end def run - logger.info("Starting listener on #{self.class.hostname}:#{self.class.port}") + logger.info("Starting listener on #{hostname}:#{port}") loop do logger.debug "Waiting for a client to connect" begin - async.handle_connection(@server.accept) - rescue Exception => exc + client = @server.accept + # We could use a thread pool here, but JRuby already does that + # and MRI threads are very light weight + Thread.new { handle_connection(client) } + rescue Errno::EBADF, IOError => exc + logger.info "TCPServer listener thread shutting down. #{exc.class}: #{exc.message}" + return + rescue ScriptError, NameError, StandardError, Exception => exc logger.error "Exception while processing connection request", exc end end end @@ -150,15 +138,15 @@ request = Common.read_bson_document(client) logger.trace 'Request', request break unless request params = BSON.deserialize(request['in']) logger.trace 'Parameters', params + reply = begin on_message(service_name, request['method'].to_sym, params) - rescue Exception => exc + rescue ScriptError, NameError, StandardError, Exception => exc logger.error "Exception while calling service: #{service_name}", exc - # TODO Return exception in header { :exception => {:message => exc.message, :class => exc.class.name} } end if reply logger.debug "Sending Header" @@ -171,10 +159,13 @@ else logger.debug "Closing client since no reply is being sent back" break end end + rescue ScriptError, NameError, StandardError, Exception => exc + logger.error "#handle_connection Exception", exc + ensure # Disconnect from the client client.close logger.debug "Disconnected from the client" end @@ -184,40 +175,31 @@ end ############################################################################ protected - # Register the supplied service in doozer - def self.register_service_in_doozer(klass) - config = { - "Config" => { - "UUID" => "#{Server.hostname}:#{Server.port}-#{$$}-#{klass.name}-#{klass.object_id}", - "Name" => klass.service_name, - "Version" => klass.service_version.to_s, - "Region" => Server.region, - "ServiceAddr" => { - "IPAddress" => Server.hostname, - "Port" => Server.port, - "MaxPort" => Server.port + 999 - }, - }, - "Registered" => true - } - RubySkynet::Registry.doozer_pool.with_connection do |doozer| - doozer[klass.service_key] = MultiJson.encode(config) - end + # Registers a Service Class as being available at this server + def register_service(klass) + logger.debug "Registering Service: #{klass.name} with name: #{klass.service_name}" + Registry.register_service(klass.service_name, klass.service_version, @region, @hostname, @port) end + # De-register service from this server + def deregister_service(klass) + logger.debug "De-registering Service: #{klass.name} with name: #{klass.service_name}" + Registry.deregister_service(klass.service_name, klass.service_version, @region, @hostname, @port) + end + # Called for each message received from the client # Returns a Hash that is sent back to the caller def on_message(service_name, method, params) - logger.benchmark_debug "Called: #{service_name}##{method}" do + logger.benchmark_info("Skynet Call: #{service_name}##{method}") do logger.trace "Method Call: #{method} with parameters:", params klass = Server.services[service_name] raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass + # TODO Use pool of services service = klass.new raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method) - # TODO Use pool of services, or Celluloid here service.send(method, params) end end end