lib/ruby_skynet/registry.rb in ruby_skynet-0.4.0.pre2 vs lib/ruby_skynet/registry.rb in ruby_skynet-0.4.0

- old
+ new

@@ -1,9 +1,10 @@ require 'sync_attr' require 'multi_json' require 'thread_safe' require 'gene_pool' +require 'resolv' # # RubySkynet Registry Client # # Keeps a local copy of the Skynet Registry @@ -15,12 +16,12 @@ include SyncAttr # Service Registry has the following format # Key: [String] 'service_name/version/region' # Value: [Array<String>] 'host:port', 'host:port' - sync_cattr_accessor :service_registry do - start_monitoring + sync_cattr_reader :service_registry do + start end @@on_server_removed_callbacks = ThreadSafe::Hash.new @@monitor_thread = nil @@ -59,28 +60,36 @@ :connect_retry_interval => 1, :connect_retry_count => 30 } end - # Lazy initialize Doozer Client Connection pool - sync_cattr_reader :doozer_pool do - GenePool.new( - :name =>"Doozer Connection Pool", - :pool_size => 5, - :timeout => 30, - :warn_timeout => 5, - :idle_timeout => 600, - :logger => logger, - :close_proc => :close - ) do - Doozer::Client.new(doozer_config) + # Register the supplied service at this Skynet Server host and Port + def self.register_service(name, version, region, hostname, port) + config = { + "Config" => { + "UUID" => "#{hostname}:#{port}-#{$$}-#{name}-#{version}", + "Name" => name, + "Version" => version.to_s, + "Region" => region, + "ServiceAddr" => { + "IPAddress" => hostname, + "Port" => port, + "MaxPort" => port + 999 + }, + }, + "Registered" => true + } + doozer_pool.with_connection do |doozer| + doozer["/services/#{name}/#{version}/#{region}/#{hostname}/#{port}"] = MultiJson.encode(config) end end - # Logging instance for this class - sync_cattr_reader :logger do - SemanticLogger::Logger.new(self, :debug) + # Deregister the supplied service from the Registry + def self.deregister_service(name, version, region, hostname, port) + doozer_pool.with_connection do |doozer| + doozer.delete("/services/#{name}/#{version}/#{region}/#{hostname}/#{port}") rescue nil + end end # Return a server that implements the specified service def self.server_for(service_name, version='*', region='Development') if servers = servers_for(service_name, version, region) @@ -113,129 +122,154 @@ end hosts end # Returns [Array<String>] a list of servers implementing the requested service - def self.servers_for(service_name, version='*', region='Development', remote = false) - if remote - if version != '*' - registered_implementers(service_name, version, region).map do |host| - service = host['Config']['ServiceAddr'] - "#{service['IPAddress']}:#{service['Port']}" + def self.servers_for(service_name, version='*', region='Development') + if version == '*' + # Find the highest version for the named service in this region + version = -1 + service_registry.keys.each do |key| + if match = key.match(/#{service_name}\/(\d+)\/#{region}/) + ver = match[1].to_i + version = ver if ver > version end - else - # Find the highest version of any particular service - versions = {} - registered_implementers(service_name, version, region).each do |host| - service = host['Config']['ServiceAddr'] - (versions[version.to_i] ||= []) << "#{service['IPAddress']}:#{service['Port']}" - end - # Return the servers implementing the highest version number - versions.sort.last.last end - else - if version == '*' - # Find the highest version for the named service in this region - version = -1 - service_registry.keys.each do |key| - if match = key.match(/#{service_name}\/(\d+)\/#{region}/) - ver = match[1].to_i - version = ver if ver > version - end - end - end - service_registry["#{service_name}/#{version}/#{region}"] end + if server_infos = service_registry["#{service_name}/#{version}/#{region}"] + server_infos.first.servers + end end # Invokes registered callbacks when a specific server is shutdown or terminates # Not when a server de-registers itself # The callback will only be called once and will need to be re-registered # after being called if future callbacks are required for that server def self.on_server_removed(server, &block) (@@on_server_removed_callbacks[server] ||= ThreadSafe::Array.new) << block end + IPV4_REG_EXP = /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/ + + # Returns [Integer] the score for the supplied ip_address + # Score currently ranges from 0 to 4 with 4 being the best score + # If the IP address does not match an IP v4 address a DNS lookup will + # be performed + def self.score_for_server(ip_address) + score = 0 + # Each matching element adds 1 to the score + # 192.168. 0. 0 + # 1 + # 1 + # 1 + # 1 + server_match = IPV4_REG_EXP.match(ip_address) || IPV4_REG_EXP.match(Resolv::DNS.new.getaddress(ip_address).to_s) + if server_match + @@local_match ||= IPV4_REG_EXP.match(Common.local_ip_address) + score = 0 + (1..4).each do |i| + break if @@local_match[i].to_i != server_match[i].to_i + score += 1 + end + end + score + end + ############################ - #protected + protected - # Fetch the all registry information from Doozer and set the internal registry + # Logging instance for this class + sync_cattr_reader :logger do + SemanticLogger::Logger.new(self, :debug) + end + + # Lazy initialize Doozer Client Connection pool + sync_cattr_reader :doozer_pool do + GenePool.new( + :name =>"Doozer Connection Pool", + :pool_size => 5, + :timeout => 30, + :warn_timeout => 5, + :idle_timeout => 600, + :logger => logger, + :close_proc => :close + ) do + Doozer::Client.new(doozer_config) + end + end + + # Fetch the all registry information from Doozer and sets the internal registry # Also starts the monitoring thread to keep the registry up to date - def self.start_monitoring + def self.start + # Populate internal registry from doozer server + # Holds a lock in this process on the service_registry so that only + # this thread will pre-populate the local copy of the registry registry = ThreadSafe::Hash.new revision = nil doozer_pool.with_connection do |doozer| revision = doozer.current_revision doozer.walk(DOOZER_SERVICES_PATH, revision).each do |node| - # path: "/services/TutorialService/1/Development/127.0.0.1/9000" - e = node.path.split('/') - - # Key: [String] 'service_name/version/region' - key = "#{e[2]}/#{e[3]}/#{e[4]}" - server = "#{e[5]}:#{e[6]}" - - if node.value.strip.size > 0 - entry = MultiJson.load(node.value) - if entry['Registered'] - # Value: [Array<String>] 'host:port', 'host:port' - servers = (registry[key] ||= ThreadSafe::Array.new) - servers << server unless servers.include?(server) - logger.debug "#start_monitoring Add Service: #{key} => #{server}" - end - end + service_info_change(registry, node.path, node.value) end end # Start monitoring thread to keep the registry up to date - @@monitor_thread = Thread.new { self.watch(revision + 1) } + @@monitor_thread = Thread.new { watch_registry(revision + 1) } + + # Cleanup when process exits + at_exit do + if @@monitor_thread + @@monitor_thread.kill + @@monitor_thread.join + @@monitor_thread = nil + end + doozer_pool.close + end registry end # Waits for any updates from Doozer and updates the internal service registry - def self.watch(revision) + def self.watch_registry(revision) logger.info "Start monitoring #{DOOZER_SERVICES_PATH}" # This thread must use its own dedicated doozer connection doozer = Doozer::Client.new(doozer_config) - doozer.watch(DOOZER_SERVICES_PATH, revision) do |node| - # path: "/services/TutorialService/1/Development/127.0.0.1/9000" - e = node.path.split('/') - # Key: [String] 'service_name/version/region' - key = "#{e[2]}/#{e[3]}/#{e[4]}" - server = "#{e[5]}:#{e[6]}" - - if node.value.strip.size > 0 - entry = MultiJson.load(node.value) - if entry['Registered'] - # Value: [Array<String>] 'host:port', 'host:port' - servers = (@@service_registry[key] ||= ThreadSafe::Array.new) - servers << server unless servers.include?(server) - logger.debug "#monitor Add/Update Service: #{key} => #{server}" - else - logger.debug "#monitor Service deregistered, remove: #{key} => #{server}" - if @@service_registry[key] - @@service_registry[key].delete(server) - @@service_registry.delete(key) if @@service_registry[key].size == 0 - end - end - else - # Service has stopped and needs to be removed - logger.debug "#monitor Service stopped, remove: #{key} => #{server}" - if @@service_registry[key] - @@service_registry[key].delete(server) - @@service_registry.delete(key) if @@service_registry[key].size == 0 - server_removed(server) - end - end - logger.debug "Updated registry", @@service_registry + # Watch for any changes + doozer.watch(DOOZER_SERVICES_PATH, revision) do |node| + service_info_change(service_registry, node.path, node.value) + logger.trace "Updated registry", service_registry end logger.info "Stopping monitoring thread normally" rescue Exception => exc logger.error "Exception in monitoring thread", exc ensure - logger.info "Stopped monitoring" + doozer.close if doozer + logger.info "Stopped monitoring for changes in the doozer registry" end + # Service information changed in doozer, so update internal registry + def self.service_info_change(registry, path, value) + # path from doozer: "/services/TutorialService/1/Development/127.0.0.1/9000" + e = path.split('/') + + # Key: [String] 'service_name/version/region' + key = "#{e[2]}/#{e[3]}/#{e[4]}" + hostname, port = e[5], e[6] + + if value.strip.size > 0 + entry = MultiJson.load(value) + if entry['Registered'] + add_server(registry, key, hostname, port) + else + # Service just de-registered + remove_server(registry, key, hostname, port, false) + end + else + # Service has stopped and needs to be removed + remove_server(registry, key, hostname, port, true) + end + end + # Invoke any registered callbacks for the specific server def self.server_removed(server) if callbacks = @@on_server_removed_callbacks.delete(server) callbacks.each do |block| begin @@ -243,9 +277,98 @@ block.call(server) rescue Exception => exc logger.error("Exception during a callback for server: #{server}", exc) end end + end + end + + # :score: [Integer] Score + # :servers: [Array<String>] 'host:port', 'host:port' + ServerInfo = Struct.new(:score, :servers ) + + # Format of the internal services registry + # key: [String] "<service_name>/<version>/<region>" + # value: [ServiceInfo, ServiceInfo] + # Sorted by highest score first + + # Add the host to the registry based on it's score + def self.add_server(registry, key, hostname, port) + server = "#{hostname}:#{port}" + logger.debug "#monitor Add/Update Service: #{key} => #{server.inspect}" + + server_infos = (registry[key] ||= ThreadSafe::Array.new) + + # If already present, then nothing to do + server_info = server_infos.find{|si| si.server == server} + return server_info if server_info + + # Look for the same score with a different server + score = score_for_server(hostname) + if server_info = server_infos.find{|si| si.score == score} + server_info.servers << server + return server_info + end + + # New score + servers = ThreadSafe::Array.new + servers << server + server_info = ServerInfo.new(score, servers) + + # Insert into Array in order of score + if index = server_infos.find_index {|si| si.score <= score} + server_infos.insert(index, server_info) + else + server_infos << server_info + end + server_info + end + + # Remove the host from the registry based + # Returns the server instance if it was removed + def self.remove_server(registry, key, hostname, port, notify) + server = "#{hostname}:#{port}" + logger.debug "Remove Service: #{key} => #{server.inspect}" + server_info = nil + if server_infos = registry[key] + server_infos.each do |si| + if si.servers.delete(server) + server_info = si + break + end + end + + # Found server + if server_info + # Cleanup if no more servers in server list + server_infos.delete(server_info) if server_info.servers.size == 0 + + # Cleanup if no more server infos + registry.delete(key) if server_infos.size == 0 + + server_removed(server) if notify + end + end + server_info + end + + # Check doozer for servers matching supplied criteria + # Code unused, consider deleting + def self.remote_servers_for(service_name, version='*', region='Development') + if version != '*' + registered_implementers(service_name, version, region).map do |host| + service = host['Config']['ServiceAddr'] + "#{service['IPAddress']}:#{service['Port']}" + end + else + # Find the highest version of any particular service + versions = {} + registered_implementers(service_name, version, region).each do |host| + service = host['Config']['ServiceAddr'] + (versions[version.to_i] ||= []) << "#{service['IPAddress']}:#{service['Port']}" + end + # Return the servers implementing the highest version number + versions.sort.last.last end end end end