lib/ruby_skynet/server.rb in ruby_skynet-1.3.0.alpha3 vs lib/ruby_skynet/server.rb in ruby_skynet-2.0.0.rc1

- old
+ new

@@ -7,77 +7,12 @@ # module RubySkynet class Server include SemanticLogger::Loggable - @@server = nil - @@services = ThreadSafe::Hash.new - - # Start a single instance of the server - def self.start(start_port = nil, ip_address = nil) - @@server ||= new(start_port, ip_address) - - # Stop the skynet server on shutdown - # To ensure services are de-registered in the service registry - at_exit do - ::RubySkynet::Server.stop - end - end - - # Stop the single instance of the server - def self.stop - @@server.finalize if @@server - @@server = nil - end - - # Is the single instance of the server running - def self.running? - (@@server != nil) && @@server.running? - end - - # Wait forever until the running server stops - def self.wait_until_server_stops - (@@server != nil) && @@server.wait_until_server_stops - end - - # Services currently loaded and available at this server when running - def self.services - @@services - end - - # Registers a Service Class as being available at this host and port - def self.register_service(klass) - raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region) - - previous_klass = @@services[klass.skynet_name] - if previous_klass && (previous_klass.name != klass.name) - logger.warn("Service with name: #{klass.skynet_name} is already registered to a different implementation:#{previous_klass.name}") - end - @@services[klass.skynet_name] = klass - @@server.register_service(klass) if @@server - end - - # De-register service - def self.deregister_service(klass) - raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region) - - @@server.deregister_service(klass) if @@server - @@services.delete(klass.skynet_name) - end - - # Load and register all services found in the supplied path and it's sub-directories - def self.load_services - RubySkynet::Server.logger.benchmark_info "Loaded Skynet Services" do - # Load services - Dir.glob("#{RubySkynet.services_path}/**/*.rb").each do |path| - load path - end - end - end - # The actual port the server is running at - attr_reader :hostname, :port + attr_reader :hostname, :port, :services # Start the server so that it can start taking RPC calls # Returns false if the server is already running def initialize(start_port = nil, ip_address = nil) ip_address ||= RubySkynet.local_ip_address @@ -99,20 +34,20 @@ end # Start Server listener thread @listener_thread = Thread.new { run } - # Register services hosted by this server - self.class.services.each_value {|klass| register_service(klass)} + # Array[RubySkynet::Service] List of services registered with this server instance + @services = ThreadSafe::Hash.new end - def finalize + def close @server.close if @server logger.info "Skynet Server Stopped" # Deregister services hosted by this server - self.class.services.each_value do |klass| + @services.each_value do |klass| deregister_service(klass) rescue nil end logger.info "Skynet Services De-registered" end @@ -126,23 +61,63 @@ @listener_thread.join end # Registers a Service Class as being available at this server def register_service(klass) + raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region) + + previous_klass = @services[klass.skynet_name] + if previous_klass && (previous_klass.name != klass.name) + logger.warn("Service with name: #{klass.skynet_name} is already registered to a different implementation:#{previous_klass.name}") + end + @services[klass.skynet_name] = klass + logger.info "Registering Service: #{klass.name} with name: #{klass.skynet_name}" ::RubySkynet.service_registry.register_service(klass.skynet_name, klass.skynet_version || 1, klass.skynet_region, @hostname, @port) end # De-register service from this server def deregister_service(klass) + raise InvalidServiceException.new("#{klass.inspect} is not a RubySkynet::Service") unless klass.respond_to?(:skynet_name) && klass.respond_to?(:skynet_version) && klass.respond_to?(:skynet_region) + logger.info "De-registering Service: #{klass.name} with name: #{klass.skynet_name}" ::RubySkynet.service_registry.deregister_service(klass.skynet_name, klass.skynet_version || 1, klass.skynet_region, @hostname, @port) + @services.delete(klass.skynet_name) end + # Loads and registers all services found in the supplied path and it's sub-directories + # Returns [RubySkynet::Service] the list of Services registered + def register_services_in_path(path=RubySkynet.services_path) + logger.benchmark_info "Loaded Skynet Services" do + # Load services + klasses = [] + Dir.glob("#{path}/**/*.rb").each do |filename| + partial = filename.sub(path,'').sub('.rb', '') + load filename + camelized = partial.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } + begin + klass = constantize(camelized) + # Register the service + register_service(klass) + klasses << klass + rescue Exception => exc + p exc + raise "Expected to find class #{camelized} in file #{filename}" + end + end + klasses + end + end + ############################################################################ protected + # Re-Register services hosted by this server in the registry + def re_register_services_in_registry + @services.each_value {|klass| register_service(klass)} + end + def run logger.info("Starting listener on #{hostname}:#{port}") loop do logger.debug "Waiting for a client to connect" begin @@ -166,11 +141,11 @@ # Process handshake handshake = { 'registered' => true, 'clientid' => BSON::ObjectId.new.to_s } - client.write(BSON.serialize(handshake).to_s) + client.write(handshake.to_bson) Common.read_bson_document(client) while(header = Common.read_bson_document(client)) do logger.debug "\n******************" logger.debug "Received Request" @@ -183,11 +158,11 @@ name = match[1] request = Common.read_bson_document(client) logger.trace 'Request', request break unless request - params = BSON.deserialize(request['in']) + params = Hash.from_bson(StringIO.new(request['in'].data)) logger.trace 'Parameters', params reply = begin on_message(name, request['method'].to_sym, params) rescue ScriptError, NameError, StandardError, Exception => exc @@ -196,15 +171,15 @@ end if reply logger.debug "Sending Header" # For this test we just send back the received header - client.write(BSON.serialize(header).to_s) + client.write(header.to_bson) logger.debug "Sending Reply" logger.trace 'Reply', reply - client.write(BSON.serialize('out' => BSON::Binary.new(BSON.serialize(reply))).to_s) + client.write({'out' => BSON::Binary.new(reply.to_bson)}.to_bson) else logger.debug "Closing client since no reply is being sent back" break end end @@ -219,16 +194,28 @@ # Called for each message received from the client # Returns a Hash that is sent back to the caller def on_message(skynet_name, method, params) logger.benchmark_info("Skynet Call: #{skynet_name}##{method}") do logger.trace "Method Call: #{method} with parameters:", params - klass = Server.services[skynet_name] + klass = services[skynet_name] raise "Invalid Skynet RPC call, service: #{skynet_name} is not available at this server" unless klass # TODO Use pool of services service = klass.new raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{skynet_name}" unless service.respond_to?(method) service.send(method, params) end + end + + # Returns the supplied camel_cased string as it's class + def constantize(camel_cased_word) + names = camel_cased_word.split('::') + names.shift if names.empty? || names.first.empty? + + constant = Object + names.each do |name| + constant = constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name) + end + constant end end end