lib/ruby_skynet/registry.rb in ruby_skynet-0.4.0.pre2 vs lib/ruby_skynet/registry.rb in ruby_skynet-0.4.0
- old
+ new
@@ -1,9 +1,10 @@
require 'sync_attr'
require 'multi_json'
require 'thread_safe'
require 'gene_pool'
+require 'resolv'
#
# RubySkynet Registry Client
#
# Keeps a local copy of the Skynet Registry
@@ -15,12 +16,12 @@
include SyncAttr
# Service Registry has the following format
# Key: [String] 'service_name/version/region'
# Value: [Array<String>] 'host:port', 'host:port'
- sync_cattr_accessor :service_registry do
- start_monitoring
+ sync_cattr_reader :service_registry do
+ start
end
@@on_server_removed_callbacks = ThreadSafe::Hash.new
@@monitor_thread = nil
@@ -59,28 +60,36 @@
:connect_retry_interval => 1,
:connect_retry_count => 30
}
end
- # Lazy initialize Doozer Client Connection pool
- sync_cattr_reader :doozer_pool do
- GenePool.new(
- :name =>"Doozer Connection Pool",
- :pool_size => 5,
- :timeout => 30,
- :warn_timeout => 5,
- :idle_timeout => 600,
- :logger => logger,
- :close_proc => :close
- ) do
- Doozer::Client.new(doozer_config)
+ # Register the supplied service at this Skynet Server host and Port
+ def self.register_service(name, version, region, hostname, port)
+ config = {
+ "Config" => {
+ "UUID" => "#{hostname}:#{port}-#{$$}-#{name}-#{version}",
+ "Name" => name,
+ "Version" => version.to_s,
+ "Region" => region,
+ "ServiceAddr" => {
+ "IPAddress" => hostname,
+ "Port" => port,
+ "MaxPort" => port + 999
+ },
+ },
+ "Registered" => true
+ }
+ doozer_pool.with_connection do |doozer|
+ doozer["/services/#{name}/#{version}/#{region}/#{hostname}/#{port}"] = MultiJson.encode(config)
end
end
- # Logging instance for this class
- sync_cattr_reader :logger do
- SemanticLogger::Logger.new(self, :debug)
+ # Deregister the supplied service from the Registry
+ def self.deregister_service(name, version, region, hostname, port)
+ doozer_pool.with_connection do |doozer|
+ doozer.delete("/services/#{name}/#{version}/#{region}/#{hostname}/#{port}") rescue nil
+ end
end
# Return a server that implements the specified service
def self.server_for(service_name, version='*', region='Development')
if servers = servers_for(service_name, version, region)
@@ -113,129 +122,154 @@
end
hosts
end
# Returns [Array<String>] a list of servers implementing the requested service
- def self.servers_for(service_name, version='*', region='Development', remote = false)
- if remote
- if version != '*'
- registered_implementers(service_name, version, region).map do |host|
- service = host['Config']['ServiceAddr']
- "#{service['IPAddress']}:#{service['Port']}"
+ def self.servers_for(service_name, version='*', region='Development')
+ if version == '*'
+ # Find the highest version for the named service in this region
+ version = -1
+ service_registry.keys.each do |key|
+ if match = key.match(/#{service_name}\/(\d+)\/#{region}/)
+ ver = match[1].to_i
+ version = ver if ver > version
end
- else
- # Find the highest version of any particular service
- versions = {}
- registered_implementers(service_name, version, region).each do |host|
- service = host['Config']['ServiceAddr']
- (versions[version.to_i] ||= []) << "#{service['IPAddress']}:#{service['Port']}"
- end
- # Return the servers implementing the highest version number
- versions.sort.last.last
end
- else
- if version == '*'
- # Find the highest version for the named service in this region
- version = -1
- service_registry.keys.each do |key|
- if match = key.match(/#{service_name}\/(\d+)\/#{region}/)
- ver = match[1].to_i
- version = ver if ver > version
- end
- end
- end
- service_registry["#{service_name}/#{version}/#{region}"]
end
+ if server_infos = service_registry["#{service_name}/#{version}/#{region}"]
+ server_infos.first.servers
+ end
end
# Invokes registered callbacks when a specific server is shutdown or terminates
# Not when a server de-registers itself
# The callback will only be called once and will need to be re-registered
# after being called if future callbacks are required for that server
def self.on_server_removed(server, &block)
(@@on_server_removed_callbacks[server] ||= ThreadSafe::Array.new) << block
end
+ IPV4_REG_EXP = /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/
+
+ # Returns [Integer] the score for the supplied ip_address
+ # Score currently ranges from 0 to 4 with 4 being the best score
+ # If the IP address does not match an IP v4 address a DNS lookup will
+ # be performed
+ def self.score_for_server(ip_address)
+ score = 0
+ # Each matching element adds 1 to the score
+ # 192.168. 0. 0
+ # 1
+ # 1
+ # 1
+ # 1
+ server_match = IPV4_REG_EXP.match(ip_address) || IPV4_REG_EXP.match(Resolv::DNS.new.getaddress(ip_address).to_s)
+ if server_match
+ @@local_match ||= IPV4_REG_EXP.match(Common.local_ip_address)
+ score = 0
+ (1..4).each do |i|
+ break if @@local_match[i].to_i != server_match[i].to_i
+ score += 1
+ end
+ end
+ score
+ end
+
############################
- #protected
+ protected
- # Fetch the all registry information from Doozer and set the internal registry
+ # Logging instance for this class
+ sync_cattr_reader :logger do
+ SemanticLogger::Logger.new(self, :debug)
+ end
+
+ # Lazy initialize Doozer Client Connection pool
+ sync_cattr_reader :doozer_pool do
+ GenePool.new(
+ :name =>"Doozer Connection Pool",
+ :pool_size => 5,
+ :timeout => 30,
+ :warn_timeout => 5,
+ :idle_timeout => 600,
+ :logger => logger,
+ :close_proc => :close
+ ) do
+ Doozer::Client.new(doozer_config)
+ end
+ end
+
+ # Fetch the all registry information from Doozer and sets the internal registry
# Also starts the monitoring thread to keep the registry up to date
- def self.start_monitoring
+ def self.start
+ # Populate internal registry from doozer server
+ # Holds a lock in this process on the service_registry so that only
+ # this thread will pre-populate the local copy of the registry
registry = ThreadSafe::Hash.new
revision = nil
doozer_pool.with_connection do |doozer|
revision = doozer.current_revision
doozer.walk(DOOZER_SERVICES_PATH, revision).each do |node|
- # path: "/services/TutorialService/1/Development/127.0.0.1/9000"
- e = node.path.split('/')
-
- # Key: [String] 'service_name/version/region'
- key = "#{e[2]}/#{e[3]}/#{e[4]}"
- server = "#{e[5]}:#{e[6]}"
-
- if node.value.strip.size > 0
- entry = MultiJson.load(node.value)
- if entry['Registered']
- # Value: [Array<String>] 'host:port', 'host:port'
- servers = (registry[key] ||= ThreadSafe::Array.new)
- servers << server unless servers.include?(server)
- logger.debug "#start_monitoring Add Service: #{key} => #{server}"
- end
- end
+ service_info_change(registry, node.path, node.value)
end
end
# Start monitoring thread to keep the registry up to date
- @@monitor_thread = Thread.new { self.watch(revision + 1) }
+ @@monitor_thread = Thread.new { watch_registry(revision + 1) }
+
+ # Cleanup when process exits
+ at_exit do
+ if @@monitor_thread
+ @@monitor_thread.kill
+ @@monitor_thread.join
+ @@monitor_thread = nil
+ end
+ doozer_pool.close
+ end
registry
end
# Waits for any updates from Doozer and updates the internal service registry
- def self.watch(revision)
+ def self.watch_registry(revision)
logger.info "Start monitoring #{DOOZER_SERVICES_PATH}"
# This thread must use its own dedicated doozer connection
doozer = Doozer::Client.new(doozer_config)
- doozer.watch(DOOZER_SERVICES_PATH, revision) do |node|
- # path: "/services/TutorialService/1/Development/127.0.0.1/9000"
- e = node.path.split('/')
- # Key: [String] 'service_name/version/region'
- key = "#{e[2]}/#{e[3]}/#{e[4]}"
- server = "#{e[5]}:#{e[6]}"
-
- if node.value.strip.size > 0
- entry = MultiJson.load(node.value)
- if entry['Registered']
- # Value: [Array<String>] 'host:port', 'host:port'
- servers = (@@service_registry[key] ||= ThreadSafe::Array.new)
- servers << server unless servers.include?(server)
- logger.debug "#monitor Add/Update Service: #{key} => #{server}"
- else
- logger.debug "#monitor Service deregistered, remove: #{key} => #{server}"
- if @@service_registry[key]
- @@service_registry[key].delete(server)
- @@service_registry.delete(key) if @@service_registry[key].size == 0
- end
- end
- else
- # Service has stopped and needs to be removed
- logger.debug "#monitor Service stopped, remove: #{key} => #{server}"
- if @@service_registry[key]
- @@service_registry[key].delete(server)
- @@service_registry.delete(key) if @@service_registry[key].size == 0
- server_removed(server)
- end
- end
- logger.debug "Updated registry", @@service_registry
+ # Watch for any changes
+ doozer.watch(DOOZER_SERVICES_PATH, revision) do |node|
+ service_info_change(service_registry, node.path, node.value)
+ logger.trace "Updated registry", service_registry
end
logger.info "Stopping monitoring thread normally"
rescue Exception => exc
logger.error "Exception in monitoring thread", exc
ensure
- logger.info "Stopped monitoring"
+ doozer.close if doozer
+ logger.info "Stopped monitoring for changes in the doozer registry"
end
+ # Service information changed in doozer, so update internal registry
+ def self.service_info_change(registry, path, value)
+ # path from doozer: "/services/TutorialService/1/Development/127.0.0.1/9000"
+ e = path.split('/')
+
+ # Key: [String] 'service_name/version/region'
+ key = "#{e[2]}/#{e[3]}/#{e[4]}"
+ hostname, port = e[5], e[6]
+
+ if value.strip.size > 0
+ entry = MultiJson.load(value)
+ if entry['Registered']
+ add_server(registry, key, hostname, port)
+ else
+ # Service just de-registered
+ remove_server(registry, key, hostname, port, false)
+ end
+ else
+ # Service has stopped and needs to be removed
+ remove_server(registry, key, hostname, port, true)
+ end
+ end
+
# Invoke any registered callbacks for the specific server
def self.server_removed(server)
if callbacks = @@on_server_removed_callbacks.delete(server)
callbacks.each do |block|
begin
@@ -243,9 +277,98 @@
block.call(server)
rescue Exception => exc
logger.error("Exception during a callback for server: #{server}", exc)
end
end
+ end
+ end
+
+ # :score: [Integer] Score
+ # :servers: [Array<String>] 'host:port', 'host:port'
+ ServerInfo = Struct.new(:score, :servers )
+
+ # Format of the internal services registry
+ # key: [String] "<service_name>/<version>/<region>"
+ # value: [ServiceInfo, ServiceInfo]
+ # Sorted by highest score first
+
+ # Add the host to the registry based on it's score
+ def self.add_server(registry, key, hostname, port)
+ server = "#{hostname}:#{port}"
+ logger.debug "#monitor Add/Update Service: #{key} => #{server.inspect}"
+
+ server_infos = (registry[key] ||= ThreadSafe::Array.new)
+
+ # If already present, then nothing to do
+ server_info = server_infos.find{|si| si.server == server}
+ return server_info if server_info
+
+ # Look for the same score with a different server
+ score = score_for_server(hostname)
+ if server_info = server_infos.find{|si| si.score == score}
+ server_info.servers << server
+ return server_info
+ end
+
+ # New score
+ servers = ThreadSafe::Array.new
+ servers << server
+ server_info = ServerInfo.new(score, servers)
+
+ # Insert into Array in order of score
+ if index = server_infos.find_index {|si| si.score <= score}
+ server_infos.insert(index, server_info)
+ else
+ server_infos << server_info
+ end
+ server_info
+ end
+
+ # Remove the host from the registry based
+ # Returns the server instance if it was removed
+ def self.remove_server(registry, key, hostname, port, notify)
+ server = "#{hostname}:#{port}"
+ logger.debug "Remove Service: #{key} => #{server.inspect}"
+ server_info = nil
+ if server_infos = registry[key]
+ server_infos.each do |si|
+ if si.servers.delete(server)
+ server_info = si
+ break
+ end
+ end
+
+ # Found server
+ if server_info
+ # Cleanup if no more servers in server list
+ server_infos.delete(server_info) if server_info.servers.size == 0
+
+ # Cleanup if no more server infos
+ registry.delete(key) if server_infos.size == 0
+
+ server_removed(server) if notify
+ end
+ end
+ server_info
+ end
+
+ # Check doozer for servers matching supplied criteria
+ # Code unused, consider deleting
+ def self.remote_servers_for(service_name, version='*', region='Development')
+ if version != '*'
+ registered_implementers(service_name, version, region).map do |host|
+ service = host['Config']['ServiceAddr']
+ "#{service['IPAddress']}:#{service['Port']}"
+ end
+ else
+ # Find the highest version of any particular service
+ versions = {}
+ registered_implementers(service_name, version, region).each do |host|
+ service = host['Config']['ServiceAddr']
+ (versions[version.to_i] ||= []) << "#{service['IPAddress']}:#{service['Port']}"
+ end
+ # Return the servers implementing the highest version number
+ versions.sort.last.last
end
end
end
end