lib/ruby_skynet/server.rb in ruby_skynet-0.3.0 vs lib/ruby_skynet/server.rb in ruby_skynet-0.4.0.pre
- old
+ new
@@ -18,10 +18,23 @@
@@hostname = nil
@@port = 2000
@@region = 'Development'
@@server = nil
+ def self.start
+ @@server ||= supervise(hostname, port)
+ end
+
+ def self.stop
+ @@server.terminate if @@server
+ @@server = nil
+ end
+
+ def self.running?
+ (@@server != nil) && @@server.running?
+ end
+
# Region under which to register Skynet services
# Default: 'Development'
def self.region
@@region
end
@@ -71,27 +84,23 @@
doozer.delete(klass.service_key) rescue nil
end
@@services.delete(klass.service_name)
end
- # Returns whether the server is running
- def self.running?
- (@@server != nil) && @@server.running?
- end
+ # Start the server so that it can start taking RPC calls
+ # Returns false if the server is already running
+ def initialize(host, port)
+ # Since we included Celluloid::IO, we're actually making a
+ # Celluloid::IO::TCPServer here
+ # TODO If port is in use, try the next port in sequence
+ @server = TCPServer.new(host, port)
+ async.run
- # Start the Server
- def self.start
- @@server = new
- @@server.start
+ # Register services hosted by this server
+ self.class.services.each_pair {|key, klass| self.class.register_service_in_doozer(klass)}
end
- # Stop the Server
- def self.stop
- @@server.terminate
- @@server = nil
- end
-
def finalize
@server.close if @server
logger.info "Skynet Server Stopped"
# Deregister services hosted by this server
@@ -101,94 +110,32 @@
end
end
logger.info "Skynet Services De-registered in Doozer"
end
- # Returns whether the server is running
- def running?
- (@server != nil) && !@server.closed?
- end
-
- ############################################################################
- protected
-
- attr_accessor :server
-
- # Register the supplied service in doozer
- def self.register_service_in_doozer(klass)
- config = {
- "Config" => {
- "UUID" => "#{Server.hostname}:#{Server.port}-#{$$}-#{klass.name}-#{klass.object_id}",
- "Name" => klass.service_name,
- "Version" => klass.service_version.to_s,
- "Region" => Server.region,
- "ServiceAddr" => {
- "IPAddress" => Server.hostname,
- "Port" => Server.port,
- "MaxPort" => Server.port + 999
- },
- },
- "Registered" => true
- }
- RubySkynet::Registry.doozer_pool.with_connection do |doozer|
- doozer[klass.service_key] = MultiJson.encode(config)
- end
- end
-
- # Start the server so that it can start taking RPC calls
- # Returns false if the server is already running
- def start
- return false if running?
-
- # Since we included Celluloid::IO, we're actually making a
- # Celluloid::IO::TCPServer here
- # TODO If port is in use, try the next port in sequence
- # TODO make port to listen on configurable
- @server = TCPServer.new('0.0.0.0', self.class.port)
- run!
-
- # Register services hosted by this server
- self.class.services.each_pair {|key, klass| self.class.register_service_in_doozer(klass)}
- true
- end
-
def run
logger.info("Starting listener on #{self.class.hostname}:#{self.class.port}")
loop do
logger.debug "Waiting for a client to connect"
begin
- handle_connection!(@server.accept)
+ async.handle_connection(@server.accept)
rescue Exception => exc
logger.error "Exception while processing connection request", exc
end
end
end
- # Called for each message received from the client
- # Returns a Hash that is sent back to the caller
- def on_message(service_name, method, params)
- logger.benchmark_debug "Called: #{service_name}##{method}" do
- logger.trace "Method Call: #{method} with parameters:", params
- klass = Server.services[service_name]
- raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass
- service = klass.new
- raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method)
- # TODO Use pool of services, or Celluloid here
- service.send(method, params)
- end
- end
-
# Called for each client connection
def handle_connection(client)
logger.debug "Client connected, waiting for data from client"
# Process handshake
handshake = {
'registered' => true,
'clientid' => BSON::ObjectId.new.to_s
}
- client.write(BSON.serialize(handshake))
+ client.write(BSON.serialize(handshake).to_s)
Common.read_bson_document(client)
while(header = Common.read_bson_document(client)) do
logger.debug "\n******************"
logger.debug "Received Request"
@@ -214,22 +161,65 @@
end
if reply
logger.debug "Sending Header"
# For this test we just send back the received header
- client.write(BSON.serialize(header))
+ client.write(BSON.serialize(header).to_s)
logger.debug "Sending Reply"
logger.trace 'Reply', reply
- client.write(BSON.serialize({'out' => BSON.serialize(reply).to_s}))
+ client.write(BSON.serialize({'out' => BSON.serialize(reply).to_s}).to_s)
else
logger.debug "Closing client since no reply is being sent back"
break
end
end
# Disconnect from the client
client.close
logger.debug "Disconnected from the client"
+ end
+
+ # Returns whether the server is running
+ def running?
+ (@server != nil) && !@server.closed?
+ end
+
+ ############################################################################
+ protected
+
+ # Register the supplied service in doozer
+ def self.register_service_in_doozer(klass)
+ config = {
+ "Config" => {
+ "UUID" => "#{Server.hostname}:#{Server.port}-#{$$}-#{klass.name}-#{klass.object_id}",
+ "Name" => klass.service_name,
+ "Version" => klass.service_version.to_s,
+ "Region" => Server.region,
+ "ServiceAddr" => {
+ "IPAddress" => Server.hostname,
+ "Port" => Server.port,
+ "MaxPort" => Server.port + 999
+ },
+ },
+ "Registered" => true
+ }
+ RubySkynet::Registry.doozer_pool.with_connection do |doozer|
+ doozer[klass.service_key] = MultiJson.encode(config)
+ end
+ end
+
+ # Called for each message received from the client
+ # Returns a Hash that is sent back to the caller
+ def on_message(service_name, method, params)
+ logger.benchmark_debug "Called: #{service_name}##{method}" do
+ logger.trace "Method Call: #{method} with parameters:", params
+ klass = Server.services[service_name]
+ raise "Invalid Skynet RPC call, service: #{service_name} is not available at this server" unless klass
+ service = klass.new
+ raise "Invalid Skynet RPC call, method: #{method} does not exist for service: #{service_name}" unless service.respond_to?(method)
+ # TODO Use pool of services, or Celluloid here
+ service.send(method, params)
+ end
end
end
end