lib/ruby_skynet/registry.rb in ruby_skynet-0.4.0 vs lib/ruby_skynet/registry.rb in ruby_skynet-0.5.0

- old
+ new

@@ -14,14 +14,16 @@ module RubySkynet class Registry include SyncAttr # Service Registry has the following format - # Key: [String] 'service_name/version/region' + # Key: [String] 'name/version/region' # Value: [Array<String>] 'host:port', 'host:port' sync_cattr_reader :service_registry do - start + logger.benchmark_info "Connected to Doozer" do + start + end end @@on_server_removed_callbacks = ThreadSafe::Hash.new @@monitor_thread = nil @@ -89,55 +91,55 @@ doozer.delete("/services/#{name}/#{version}/#{region}/#{hostname}/#{port}") rescue nil end end # Return a server that implements the specified service - def self.server_for(service_name, version='*', region='Development') - if servers = servers_for(service_name, version, region) + def self.server_for(name, version='*', region='Development') + if servers = servers_for(name, version, region) # Randomly select one of the servers offering the service servers[rand(servers.size)] else - msg = "No servers available for service: #{service_name} with version: #{version} in region: #{region}" + msg = "No servers available for service: #{name} with version: #{version} in region: #{region}" logger.warn msg raise ServiceUnavailable.new(msg) end end # Returns [Array] of the hostname and port pair [String] that implements a particular service # Performs a doozer lookup to find the servers # - # service_name: + # name: # Name of the service to lookup # version: # Version of service to locate # Default: All versions # region: # Region to look for the service in - def self.registered_implementers(service_name='*', version='*', region='Development') + def self.registered_implementers(name='*', version='*', region='Development') hosts = [] doozer_pool.with_connection do |doozer| - doozer.walk("/services/#{service_name}/#{version}/#{region}/*/*").each do |node| + doozer.walk("/services/#{name}/#{version}/#{region}/*/*").each do |node| entry = MultiJson.load(node.value) hosts << entry if entry['Registered'] end end hosts end # Returns [Array<String>] a list of servers implementing the requested service - def self.servers_for(service_name, version='*', region='Development') + def self.servers_for(name, version='*', region='Development') if version == '*' # Find the highest version for the named service in this region version = -1 service_registry.keys.each do |key| - if match = key.match(/#{service_name}\/(\d+)\/#{region}/) + if match = key.match(/#{name}\/(\d+)\/#{region}/) ver = match[1].to_i version = ver if ver > version end end end - if server_infos = service_registry["#{service_name}/#{version}/#{region}"] + if server_infos = service_registry["#{name}/#{version}/#{region}"] server_infos.first.servers end end # Invokes registered callbacks when a specific server is shutdown or terminates @@ -162,11 +164,11 @@ # 1 # 1 # 1 server_match = IPV4_REG_EXP.match(ip_address) || IPV4_REG_EXP.match(Resolv::DNS.new.getaddress(ip_address).to_s) if server_match - @@local_match ||= IPV4_REG_EXP.match(Common.local_ip_address) + @@local_match ||= IPV4_REG_EXP.match(RubySkynet.local_ip_address) score = 0 (1..4).each do |i| break if @@local_match[i].to_i != server_match[i].to_i score += 1 end @@ -176,13 +178,11 @@ ############################ protected # Logging instance for this class - sync_cattr_reader :logger do - SemanticLogger::Logger.new(self, :debug) - end + include SemanticLogger::Loggable # Lazy initialize Doozer Client Connection pool sync_cattr_reader :doozer_pool do GenePool.new( :name =>"Doozer Connection Pool", @@ -248,11 +248,11 @@ # Service information changed in doozer, so update internal registry def self.service_info_change(registry, path, value) # path from doozer: "/services/TutorialService/1/Development/127.0.0.1/9000" e = path.split('/') - # Key: [String] 'service_name/version/region' + # Key: [String] 'name/version/region' key = "#{e[2]}/#{e[3]}/#{e[4]}" hostname, port = e[5], e[6] if value.strip.size > 0 entry = MultiJson.load(value) @@ -285,11 +285,11 @@ # :score: [Integer] Score # :servers: [Array<String>] 'host:port', 'host:port' ServerInfo = Struct.new(:score, :servers ) # Format of the internal services registry - # key: [String] "<service_name>/<version>/<region>" + # key: [String] "<name>/<version>/<region>" # value: [ServiceInfo, ServiceInfo] # Sorted by highest score first # Add the host to the registry based on it's score def self.add_server(registry, key, hostname, port) @@ -351,19 +351,19 @@ server_info end # Check doozer for servers matching supplied criteria # Code unused, consider deleting - def self.remote_servers_for(service_name, version='*', region='Development') + def self.remote_servers_for(name, version='*', region='Development') if version != '*' - registered_implementers(service_name, version, region).map do |host| + registered_implementers(name, version, region).map do |host| service = host['Config']['ServiceAddr'] "#{service['IPAddress']}:#{service['Port']}" end else # Find the highest version of any particular service versions = {} - registered_implementers(service_name, version, region).each do |host| + registered_implementers(name, version, region).each do |host| service = host['Config']['ServiceAddr'] (versions[version.to_i] ||= []) << "#{service['IPAddress']}:#{service['Port']}" end # Return the servers implementing the highest version number versions.sort.last.last