lib/ruby_skynet/server.rb in ruby_skynet-0.4.0.pre2 vs lib/ruby_skynet/server.rb in ruby_skynet-0.4.0
- old
+ new
@@ -1,126 +1,114 @@
require 'bson'
-require 'celluloid/io'
-# Replace Celluloid logger immediately upon loading the Server Instance
-Celluloid.logger = SemanticLogger::Logger.new('Celluloid')
-
#
# RubySkynet Server
#
# Hosts one or more Skynet Services
#
module RubySkynet
class Server
- include Celluloid::IO
include SemanticLogger::Loggable
- # TODO Make Server instance based rather than class based. Then make instance global
- @@hostname = nil
- @@port = 2000
- @@region = 'Development'
@@server = nil
+ @@services = ThreadSafe::Hash.new
- def self.start
- @@server ||= supervise(hostname, port)
+ # Start a single instance of the server
+ def self.start(region = 'Development', start_port = 2000, hostname = nil)
+ @@server ||= new(region, start_port, hostname)
end
+ # Stop the single instance of the server
def self.stop
- @@server.terminate if @@server
+ @@server.finalize if @@server
@@server = nil
end
+ # Is the single instance of the server running
def self.running?
(@@server != nil) && @@server.running?
end
- # Region under which to register Skynet services
- # Default: 'Development'
- def self.region
- @@region
- end
-
- def self.region=(region)
- @@region = region
- end
-
- # Port to listen to requests on
- # Default: 2000
- def self.port
- @@port
- end
-
- def self.port=(port)
- @@port = port
- end
-
- # Override the hostname at which this server is running
- # Useful when the service is behind a firewall or NAT device
- def self.hostname=(hostname)
- @@hostname = hostname
- end
-
- # Returns [String] hostname of the current server
- def self.hostname
- @@hostname ||= Socket.gethostname
- end
-
- @@services = ThreadSafe::Hash.new
-
# Services currently loaded and available at this server when running
def self.services
@@services
end
- # Registers a Service Class as being available at this port
+ # Registers a Service Class as being available at this host and port
def self.register_service(klass)
- logger.debug "Registering Service: #{klass.name} with name: #{klass.service_name}"
+ raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:service_name) && klass.respond_to?(:service_version)
+
+ if previous_klass = @@services[klass.service_name] && (previous_klass.name != klass.name)
+ logger.warn("Service with name: #{klass.service_name} is already registered to a different implementation:#{previous_klass.name}")
+ end
@@services[klass.service_name] = klass
- register_service_in_doozer(klass) if running?
+ @@server.register_service(klass) if @@server
end
- # De-register service in doozer
+ # De-register service
def self.deregister_service(klass)
- RubySkynet::Registry.doozer_pool.with_connection do |doozer|
- doozer.delete(klass.service_key) rescue nil
- end
+ raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:service_name) && klass.respond_to?(:service_version)
+
+ @@server.deregister_service(klass) if @@server
@@services.delete(klass.service_name)
end
+ # The actual port the server is running at which will be different
+ # from Server.port if that port was already in use at startup
+ attr_reader :hostname, :port, :region
+
# Start the server so that it can start taking RPC calls
# Returns false if the server is already running
- def initialize(host, port)
- # Since we included Celluloid::IO, we're actually making a
- # Celluloid::IO::TCPServer here
- # TODO If port is in use, try the next port in sequence
- @server = TCPServer.new(host, port)
- async.run
+ def initialize(region = 'Development', start_port = 2000, hostname = nil)
+ hostname ||= Common.local_ip_address
+ # If port is in use, try the next port in sequence
+ port_count = 0
+ begin
+ @server = ::TCPServer.new(hostname, start_port + port_count)
+ @hostname = hostname
+ @port = start_port + port_count
+ @region = region
+ rescue Errno::EADDRINUSE => exc
+ if port_count < 999
+ port_count += 1
+ retry
+ end
+ raise exc
+ end
+
+ # Start Server listener thread
+ Thread.new { run }
+
# Register services hosted by this server
- self.class.services.each_pair {|key, klass| self.class.register_service_in_doozer(klass)}
+ self.class.services.each_value {|klass| register_service(klass)}
end
def finalize
@server.close if @server
logger.info "Skynet Server Stopped"
# Deregister services hosted by this server
- RubySkynet::Registry.doozer_pool.with_connection do |doozer|
- self.class.services.each_value do |klass|
- doozer.delete(klass.service_key) rescue nil
- end
+ self.class.services.each_value do |klass|
+ deregister_service(klass) rescue nil
end
- logger.info "Skynet Services De-registered in Doozer"
+ logger.info "Skynet Services De-registered"
end
def run
- logger.info("Starting listener on #{self.class.hostname}:#{self.class.port}")
+ logger.info("Starting listener on #{hostname}:#{port}")
loop do
logger.debug "Waiting for a client to connect"
begin
- async.handle_connection(@server.accept)
- rescue Exception => exc
+ client = @server.accept
+ # We could use a thread pool here, but JRuby already does that
+ # and MRI threads are very light weight
+ Thread.new { handle_connection(client) }
+ rescue Errno::EBADF, IOError => exc
+ logger.info "TCPServer listener thread shutting down. #{exc.class}: #{exc.message}"
+ return
+ rescue ScriptError, NameError, StandardError, Exception => exc
logger.error "Exception while processing connection request", exc
end
end
end
@@ -150,15 +138,15 @@
request = Common.read_bson_document(client)
logger.trace 'Request', request
break unless request
params = BSON.deserialize(request['in'])
logger.trace 'Parameters', params
+
reply = begin
on_message(service_name, request['method'].to_sym, params)
- rescue Exception => exc
+ rescue ScriptError, NameError, StandardError, Exception => exc
logger.error "Exception while calling service: #{service_name}", exc
- # TODO Return exception in header
{ :exception => {:message => exc.message, :class => exc.class.name} }
end
if reply
logger.debug "Sending Header"
@@ -171,10 +159,13 @@
else
logger.debug "Closing client since no reply is being sent back"
break
end
end
+ rescue ScriptError, NameError, StandardError, Exception => exc
+ logger.error "#handle_connection Exception", exc
+ ensure
# Disconnect from the client
client.close
logger.debug "Disconnected from the client"
end
@@ -184,40 +175,31 @@
end
############################################################################
protected
- # Register the supplied service in doozer
- def self.register_service_in_doozer(klass)
- config = {
- "Config" => {
- "UUID" => "#{Server.hostname}:#{Server.port}-#{$$}-#{klass.name}-#{klass.object_id}",
- "Name" => klass.service_name,
- "Version" => klass.service_version.to_s,
- "Region" => Server.region,
- "ServiceAddr" => {
- "IPAddress" => Server.hostname,
- "Port" => Server.port,
- "MaxPort" => Server.port + 999
- },
- },
- "Registered" => true
- }
- RubySkynet::Registry.doozer_pool.with_connection do |doozer|
- doozer[klass.service_key] = MultiJson.encode(config)
- end
+ # Registers a Service Class as being available at this server
+ def register_service(klass)
+ logger.debug "Registering Service: #{klass.name} with name: #{klass.service_name}"
+ Registry.register_service(klass.service_name, klass.service_version, @region, @hostname, @port)
end
+ # De-register service from this server
+ def deregister_service(klass)
+ logger.debug "De-registering Service: #{klass.name} with name: #{klass.service_name}"
+ Registry.deregister_service(klass.service_name, klass.service_version, @region, @hostname, @port)
+ end
+
# Called for each message received from the client
# Returns a Hash that is sent back to the caller
def on_message(service_name, method, params)
- logger.benchmark_debug "Called: #{service_name}##{method}" do
+ logger.benchmark_info("Skynet Call: #{service_name}##{method}") do
logger.trace "Method Call: #{method} with parameters:", params
klass = Server.services[service_name]
raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass
+ # TODO Use pool of services
service = klass.new
raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method)
- # TODO Use pool of services, or Celluloid here
service.send(method, params)
end
end
end