lib/ruby_skynet/server.rb in ruby_skynet-0.4.0 vs lib/ruby_skynet/server.rb in ruby_skynet-0.5.0
- old
+ new
@@ -11,12 +11,18 @@
@@server = nil
@@services = ThreadSafe::Hash.new
# Start a single instance of the server
- def self.start(region = 'Development', start_port = 2000, hostname = nil)
- @@server ||= new(region, start_port, hostname)
+ def self.start(start_port = nil, ip_address = nil)
+ @@server ||= new(start_port, ip_address)
+
+ # Stop the skynet server on shutdown
+ # To ensure services are de-registered in doozer
+ at_exit do
+ ::RubySkynet::Server.stop
+ end
end
# Stop the single instance of the server
def self.stop
@@server.finalize if @@server
@@ -26,60 +32,75 @@
# Is the single instance of the server running
def self.running?
(@@server != nil) && @@server.running?
end
+ # Wait forever until the running server stops
+ def self.wait_until_server_stops
+ (@@server != nil) && @@server.wait_until_server_stops
+ end
+
# Services currently loaded and available at this server when running
def self.services
@@services
end
# Registers a Service Class as being available at this host and port
def self.register_service(klass)
- raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:service_name) && klass.respond_to?(:service_version)
+ raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region)
- if previous_klass = @@services[klass.service_name] && (previous_klass.name != klass.name)
- logger.warn("Service with name: #{klass.service_name} is already registered to a different implementation:#{previous_klass.name}")
+ if previous_klass = @@services[klass.skynet_name] && (previous_klass.name != klass.name)
+ logger.warn("Service with name: #{klass.skynet_name} is already registered to a different implementation:#{previous_klass.name}")
end
- @@services[klass.service_name] = klass
+ @@services[klass.skynet_name] = klass
@@server.register_service(klass) if @@server
end
# De-register service
def self.deregister_service(klass)
- raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:service_name) && klass.respond_to?(:service_version)
+ raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region)
@@server.deregister_service(klass) if @@server
- @@services.delete(klass.service_name)
+ @@services.delete(klass.skynet_name)
end
- # The actual port the server is running at which will be different
- # from Server.port if that port was already in use at startup
- attr_reader :hostname, :port, :region
+ # Load and register all services found in the supplied path and it's sub-directories
+ def self.load_services
+ RubySkynet::Server.logger.benchmark_info "Loaded Skynet Services" do
+ # Load services
+ Dir.glob("#{RubySkynet.services_path}/**/*.rb").each do |path|
+ load path
+ end
+ end
+ end
+ # The actual port the server is running at
+ attr_reader :hostname, :port
+
# Start the server so that it can start taking RPC calls
# Returns false if the server is already running
- def initialize(region = 'Development', start_port = 2000, hostname = nil)
- hostname ||= Common.local_ip_address
+ def initialize(start_port = nil, ip_address = nil)
+ ip_address ||= RubySkynet.local_ip_address
+ start_port = (start_port || RubySkynet.server_port).to_i
+ raise InvalidConfigurationException.new("Invalid Starting Port number: #{start_port}") unless start_port > 0
# If port is in use, try the next port in sequence
port_count = 0
begin
- @server = ::TCPServer.new(hostname, start_port + port_count)
- @hostname = hostname
+ @server = ::TCPServer.new(ip_address, start_port + port_count)
+ @hostname = ip_address
@port = start_port + port_count
- @region = region
rescue Errno::EADDRINUSE => exc
if port_count < 999
port_count += 1
retry
end
raise exc
end
# Start Server listener thread
- Thread.new { run }
+ @listener_thread = Thread.new { run }
# Register services hosted by this server
self.class.services.each_value {|klass| register_service(klass)}
end
@@ -92,10 +113,23 @@
deregister_service(klass) rescue nil
end
logger.info "Skynet Services De-registered"
end
+ # Returns whether the server is running
+ def running?
+ (@server != nil) && !@server.closed?
+ end
+
+ # Wait forever until the running server stops
+ def wait_until_server_stops
+ @listener_thread.join
+ end
+
+ ############################################################################
+ protected
+
def run
logger.info("Starting listener on #{hostname}:#{port}")
loop do
logger.debug "Waiting for a client to connect"
begin
@@ -127,26 +161,26 @@
while(header = Common.read_bson_document(client)) do
logger.debug "\n******************"
logger.debug "Received Request"
logger.trace 'Header', header
- service_name = header['servicemethod'] # "#{service_name}.Forward",
- raise "Invalid Skynet RPC Request, missing servicemethod" unless service_name
- match = service_name.match /(.*)\.Forward$/
+ name = header['servicemethod']
+ raise "Invalid Skynet RPC Request, missing servicemethod" unless name
+ match = name.match /(.*)\.Forward$/
raise "Invalid Skynet RPC Request, servicemethod must end with '.Forward'" unless match
- service_name = match[1]
+ name = match[1]
request = Common.read_bson_document(client)
logger.trace 'Request', request
break unless request
params = BSON.deserialize(request['in'])
logger.trace 'Parameters', params
-
+
reply = begin
- on_message(service_name, request['method'].to_sym, params)
+ on_message(name, request['method'].to_sym, params)
rescue ScriptError, NameError, StandardError, Exception => exc
- logger.error "Exception while calling service: #{service_name}", exc
+ logger.error "Exception while calling service: #{name}", exc
{ :exception => {:message => exc.message, :class => exc.class.name} }
end
if reply
logger.debug "Sending Header"
@@ -167,39 +201,31 @@
# Disconnect from the client
client.close
logger.debug "Disconnected from the client"
end
- # Returns whether the server is running
- def running?
- (@server != nil) && !@server.closed?
- end
-
- ############################################################################
- protected
-
# Registers a Service Class as being available at this server
def register_service(klass)
- logger.debug "Registering Service: #{klass.name} with name: #{klass.service_name}"
- Registry.register_service(klass.service_name, klass.service_version, @region, @hostname, @port)
+ logger.debug "Registering Service: #{klass.name} with name: #{klass.skynet_name}"
+ Registry.register_service(klass.skynet_name, klass.skynet_version || 1, klass.skynet_region, @hostname, @port)
end
# De-register service from this server
def deregister_service(klass)
- logger.debug "De-registering Service: #{klass.name} with name: #{klass.service_name}"
- Registry.deregister_service(klass.service_name, klass.service_version, @region, @hostname, @port)
+ logger.debug "De-registering Service: #{klass.name} with name: #{klass.skynet_name}"
+ Registry.deregister_service(klass.skynet_name, klass.skynet_version || 1, klass.skynet_region, @hostname, @port)
end
# Called for each message received from the client
# Returns a Hash that is sent back to the caller
- def on_message(service_name, method, params)
- logger.benchmark_info("Skynet Call: #{service_name}##{method}") do
+ def on_message(skynet_name, method, params)
+ logger.benchmark_info("Skynet Call: #{skynet_name}##{method}") do
logger.trace "Method Call: #{method} with parameters:", params
- klass = Server.services[service_name]
- raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass
+ klass = Server.services[skynet_name]
+ raise "Invalid Skynet RPC call, service: #{skynet_name} is not available at this server" unless klass
# TODO Use pool of services
service = klass.new
- raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method)
+ raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{skynet_name}" unless service.respond_to?(method)
service.send(method, params)
end
end
end