lib/ruby_skynet/server.rb in ruby_skynet-1.3.0.alpha3 vs lib/ruby_skynet/server.rb in ruby_skynet-2.0.0.rc1
- old
+ new
@@ -7,77 +7,12 @@
#
module RubySkynet
class Server
include SemanticLogger::Loggable
- @@server = nil
- @@services = ThreadSafe::Hash.new
-
- # Start a single instance of the server
- def self.start(start_port = nil, ip_address = nil)
- @@server ||= new(start_port, ip_address)
-
- # Stop the skynet server on shutdown
- # To ensure services are de-registered in the service registry
- at_exit do
- ::RubySkynet::Server.stop
- end
- end
-
- # Stop the single instance of the server
- def self.stop
- @@server.finalize if @@server
- @@server = nil
- end
-
- # Is the single instance of the server running
- def self.running?
- (@@server != nil) && @@server.running?
- end
-
- # Wait forever until the running server stops
- def self.wait_until_server_stops
- (@@server != nil) && @@server.wait_until_server_stops
- end
-
- # Services currently loaded and available at this server when running
- def self.services
- @@services
- end
-
- # Registers a Service Class as being available at this host and port
- def self.register_service(klass)
- raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region)
-
- previous_klass = @@services[klass.skynet_name]
- if previous_klass && (previous_klass.name != klass.name)
- logger.warn("Service with name: #{klass.skynet_name} is already registered to a different implementation:#{previous_klass.name}")
- end
- @@services[klass.skynet_name] = klass
- @@server.register_service(klass) if @@server
- end
-
- # De-register service
- def self.deregister_service(klass)
- raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region)
-
- @@server.deregister_service(klass) if @@server
- @@services.delete(klass.skynet_name)
- end
-
- # Load and register all services found in the supplied path and it's sub-directories
- def self.load_services
- RubySkynet::Server.logger.benchmark_info "Loaded Skynet Services" do
- # Load services
- Dir.glob("#{RubySkynet.services_path}/**/*.rb").each do |path|
- load path
- end
- end
- end
-
# The actual port the server is running at
- attr_reader :hostname, :port
+ attr_reader :hostname, :port, :services
# Start the server so that it can start taking RPC calls
# Returns false if the server is already running
def initialize(start_port = nil, ip_address = nil)
ip_address ||= RubySkynet.local_ip_address
@@ -99,20 +34,20 @@
end
# Start Server listener thread
@listener_thread = Thread.new { run }
- # Register services hosted by this server
- self.class.services.each_value {|klass| register_service(klass)}
+ # Array[RubySkynet::Service] List of services registered with this server instance
+ @services = ThreadSafe::Hash.new
end
- def finalize
+ def close
@server.close if @server
logger.info "Skynet Server Stopped"
# Deregister services hosted by this server
- self.class.services.each_value do |klass|
+ @services.each_value do |klass|
deregister_service(klass) rescue nil
end
logger.info "Skynet Services De-registered"
end
@@ -126,23 +61,63 @@
@listener_thread.join
end
# Registers a Service Class as being available at this server
def register_service(klass)
+ raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region)
+
+ previous_klass = @services[klass.skynet_name]
+ if previous_klass && (previous_klass.name != klass.name)
+ logger.warn("Service with name: #{klass.skynet_name} is already registered to a different implementation:#{previous_klass.name}")
+ end
+ @services[klass.skynet_name] = klass
+
logger.info "Registering Service: #{klass.name} with name: #{klass.skynet_name}"
::RubySkynet.service_registry.register_service(klass.skynet_name, klass.skynet_version || 1, klass.skynet_region, @hostname, @port)
end
# De-register service from this server
def deregister_service(klass)
+ raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region)
+
logger.info "De-registering Service: #{klass.name} with name: #{klass.skynet_name}"
::RubySkynet.service_registry.deregister_service(klass.skynet_name, klass.skynet_version || 1, klass.skynet_region, @hostname, @port)
+ @services.delete(klass.skynet_name)
end
+ # Loads and registers all services found in the supplied path and it's sub-directories
+ # Returns [RubySkynet::Service] the list of Services registered
+ def register_services_in_path(path=RubySkynet.services_path)
+ logger.benchmark_info "Loaded Skynet Services" do
+ # Load services
+ klasses = []
+ Dir.glob("#{path}/**/*.rb").each do |filename|
+ partial = filename.sub(path,'').sub('.rb', '')
+ load filename
+ camelized = partial.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }
+ begin
+ klass = constantize(camelized)
+ # Register the service
+ register_service(klass)
+ klasses << klass
+ rescue Exception => exc
+ p exc
+ raise "Expected to find class #{camelized} in file #{filename}"
+ end
+ end
+ klasses
+ end
+ end
+
############################################################################
protected
+ # Re-Register services hosted by this server in the registry
+ def re_register_services_in_registry
+ @services.each_value {|klass| register_service(klass)}
+ end
+
def run
logger.info("Starting listener on #{hostname}:#{port}")
loop do
logger.debug "Waiting for a client to connect"
begin
@@ -166,11 +141,11 @@
# Process handshake
handshake = {
'registered' => true,
'clientid' => BSON::ObjectId.new.to_s
}
- client.write(BSON.serialize(handshake).to_s)
+ client.write(handshake.to_bson)
Common.read_bson_document(client)
while(header = Common.read_bson_document(client)) do
logger.debug "\n******************"
logger.debug "Received Request"
@@ -183,11 +158,11 @@
name = match[1]
request = Common.read_bson_document(client)
logger.trace 'Request', request
break unless request
- params = BSON.deserialize(request['in'])
+ params = Hash.from_bson(StringIO.new(request['in'].data))
logger.trace 'Parameters', params
reply = begin
on_message(name, request['method'].to_sym, params)
rescue ScriptError, NameError, StandardError, Exception => exc
@@ -196,15 +171,15 @@
end
if reply
logger.debug "Sending Header"
# For this test we just send back the received header
- client.write(BSON.serialize(header).to_s)
+ client.write(header.to_bson)
logger.debug "Sending Reply"
logger.trace 'Reply', reply
- client.write(BSON.serialize('out' => BSON::Binary.new(BSON.serialize(reply))).to_s)
+ client.write({'out' => BSON::Binary.new(reply.to_bson)}.to_bson)
else
logger.debug "Closing client since no reply is being sent back"
break
end
end
@@ -219,16 +194,28 @@
# Called for each message received from the client
# Returns a Hash that is sent back to the caller
def on_message(skynet_name, method, params)
logger.benchmark_info("Skynet Call: #{skynet_name}##{method}") do
logger.trace "Method Call: #{method} with parameters:", params
- klass = Server.services[skynet_name]
+ klass = services[skynet_name]
raise "Invalid Skynet RPC call, service: #{skynet_name} is not available at this server" unless klass
# TODO Use pool of services
service = klass.new
raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{skynet_name}" unless service.respond_to?(method)
service.send(method, params)
end
+ end
+
+ # Returns the supplied camel_cased string as it's class
+ def constantize(camel_cased_word)
+ names = camel_cased_word.split('::')
+ names.shift if names.empty? || names.first.empty?
+
+ constant = Object
+ names.each do |name|
+ constant = constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name)
+ end
+ constant
end
end
end