require 'sync_attr' require 'multi_json' require 'thread_safe' require 'gene_pool' require 'resolv' # # RubySkynet Registry Client # # Keeps a local copy of the Skynet Registry # # Subscribes to Registry changes and the internal copy up to date # module RubySkynet class Registry include SyncAttr # Service Registry has the following format # Key: [String] 'service_name/version/region' # Value: [Array] 'host:port', 'host:port' sync_cattr_reader :service_registry do start end @@on_server_removed_callbacks = ThreadSafe::Hash.new @@monitor_thread = nil DOOZER_SERVICES_PATH = "/services/*/*/*/*/*" # Default doozer configuration # To replace this default, set the config as follows: # RubySkynet::Client.doozer_config = { .... } # # :servers [Array of String] # Array of URL's of doozer servers to connect to with port numbers # ['server1:2000', 'server2:2000'] # # The second server will only be attempted once the first server # cannot be connected to or has timed out on connect # A read failure or timeout will not result in switching to the second # server, only a connection failure or during an automatic reconnect # # :read_timeout [Float] # Time in seconds to timeout on read # Can be overridden by supplying a timeout in the read call # # :connect_timeout [Float] # Time in seconds to timeout when trying to connect to the server # # :connect_retry_count [Fixnum] # Number of times to retry connecting when a connection fails # # :connect_retry_interval [Float] # Number of seconds between connection retry attempts after the first failed attempt sync_cattr_accessor :doozer_config do { :servers => ['127.0.0.1:8046'], :read_timeout => 5, :connect_timeout => 3, :connect_retry_interval => 1, :connect_retry_count => 30 } end # Register the supplied service at this Skynet Server host and Port def self.register_service(name, version, region, hostname, port) config = { "Config" => { "UUID" => "#{hostname}:#{port}-#{$$}-#{name}-#{version}", "Name" => name, "Version" => version.to_s, "Region" => region, "ServiceAddr" => { "IPAddress" => hostname, "Port" => port, "MaxPort" => port + 999 }, }, "Registered" => true } doozer_pool.with_connection do |doozer| doozer["/services/#{name}/#{version}/#{region}/#{hostname}/#{port}"] = MultiJson.encode(config) end end # Deregister the supplied service from the Registry def self.deregister_service(name, version, region, hostname, port) doozer_pool.with_connection do |doozer| doozer.delete("/services/#{name}/#{version}/#{region}/#{hostname}/#{port}") rescue nil end end # Return a server that implements the specified service def self.server_for(service_name, version='*', region='Development') if servers = servers_for(service_name, version, region) # Randomly select one of the servers offering the service servers[rand(servers.size)] else msg = "No servers available for service: #{service_name} with version: #{version} in region: #{region}" logger.warn msg raise ServiceUnavailable.new(msg) end end # Returns [Array] of the hostname and port pair [String] that implements a particular service # Performs a doozer lookup to find the servers # # service_name: # Name of the service to lookup # version: # Version of service to locate # Default: All versions # region: # Region to look for the service in def self.registered_implementers(service_name='*', version='*', region='Development') hosts = [] doozer_pool.with_connection do |doozer| doozer.walk("/services/#{service_name}/#{version}/#{region}/*/*").each do |node| entry = MultiJson.load(node.value) hosts << entry if entry['Registered'] end end hosts end # Returns [Array] a list of servers implementing the requested service def self.servers_for(service_name, version='*', region='Development') if version == '*' # Find the highest version for the named service in this region version = -1 service_registry.keys.each do |key| if match = key.match(/#{service_name}\/(\d+)\/#{region}/) ver = match[1].to_i version = ver if ver > version end end end if server_infos = service_registry["#{service_name}/#{version}/#{region}"] server_infos.first.servers end end # Invokes registered callbacks when a specific server is shutdown or terminates # Not when a server de-registers itself # The callback will only be called once and will need to be re-registered # after being called if future callbacks are required for that server def self.on_server_removed(server, &block) (@@on_server_removed_callbacks[server] ||= ThreadSafe::Array.new) << block end IPV4_REG_EXP = /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/ # Returns [Integer] the score for the supplied ip_address # Score currently ranges from 0 to 4 with 4 being the best score # If the IP address does not match an IP v4 address a DNS lookup will # be performed def self.score_for_server(ip_address) score = 0 # Each matching element adds 1 to the score # 192.168. 0. 0 # 1 # 1 # 1 # 1 server_match = IPV4_REG_EXP.match(ip_address) || IPV4_REG_EXP.match(Resolv::DNS.new.getaddress(ip_address).to_s) if server_match @@local_match ||= IPV4_REG_EXP.match(Common.local_ip_address) score = 0 (1..4).each do |i| break if @@local_match[i].to_i != server_match[i].to_i score += 1 end end score end ############################ protected # Logging instance for this class sync_cattr_reader :logger do SemanticLogger::Logger.new(self, :debug) end # Lazy initialize Doozer Client Connection pool sync_cattr_reader :doozer_pool do GenePool.new( :name =>"Doozer Connection Pool", :pool_size => 5, :timeout => 30, :warn_timeout => 5, :idle_timeout => 600, :logger => logger, :close_proc => :close ) do Doozer::Client.new(doozer_config) end end # Fetch the all registry information from Doozer and sets the internal registry # Also starts the monitoring thread to keep the registry up to date def self.start # Populate internal registry from doozer server # Holds a lock in this process on the service_registry so that only # this thread will pre-populate the local copy of the registry registry = ThreadSafe::Hash.new revision = nil doozer_pool.with_connection do |doozer| revision = doozer.current_revision doozer.walk(DOOZER_SERVICES_PATH, revision).each do |node| service_info_change(registry, node.path, node.value) end end # Start monitoring thread to keep the registry up to date @@monitor_thread = Thread.new { watch_registry(revision + 1) } # Cleanup when process exits at_exit do if @@monitor_thread @@monitor_thread.kill @@monitor_thread.join @@monitor_thread = nil end doozer_pool.close end registry end # Waits for any updates from Doozer and updates the internal service registry def self.watch_registry(revision) logger.info "Start monitoring #{DOOZER_SERVICES_PATH}" # This thread must use its own dedicated doozer connection doozer = Doozer::Client.new(doozer_config) # Watch for any changes doozer.watch(DOOZER_SERVICES_PATH, revision) do |node| service_info_change(service_registry, node.path, node.value) logger.trace "Updated registry", service_registry end logger.info "Stopping monitoring thread normally" rescue Exception => exc logger.error "Exception in monitoring thread", exc ensure doozer.close if doozer logger.info "Stopped monitoring for changes in the doozer registry" end # Service information changed in doozer, so update internal registry def self.service_info_change(registry, path, value) # path from doozer: "/services/TutorialService/1/Development/127.0.0.1/9000" e = path.split('/') # Key: [String] 'service_name/version/region' key = "#{e[2]}/#{e[3]}/#{e[4]}" hostname, port = e[5], e[6] if value.strip.size > 0 entry = MultiJson.load(value) if entry['Registered'] add_server(registry, key, hostname, port) else # Service just de-registered remove_server(registry, key, hostname, port, false) end else # Service has stopped and needs to be removed remove_server(registry, key, hostname, port, true) end end # Invoke any registered callbacks for the specific server def self.server_removed(server) if callbacks = @@on_server_removed_callbacks.delete(server) callbacks.each do |block| begin logger.info "Calling callback for server: #{server}" block.call(server) rescue Exception => exc logger.error("Exception during a callback for server: #{server}", exc) end end end end # :score: [Integer] Score # :servers: [Array] 'host:port', 'host:port' ServerInfo = Struct.new(:score, :servers ) # Format of the internal services registry # key: [String] "//" # value: [ServiceInfo, ServiceInfo] # Sorted by highest score first # Add the host to the registry based on it's score def self.add_server(registry, key, hostname, port) server = "#{hostname}:#{port}" logger.debug "#monitor Add/Update Service: #{key} => #{server.inspect}" server_infos = (registry[key] ||= ThreadSafe::Array.new) # If already present, then nothing to do server_info = server_infos.find{|si| si.server == server} return server_info if server_info # Look for the same score with a different server score = score_for_server(hostname) if server_info = server_infos.find{|si| si.score == score} server_info.servers << server return server_info end # New score servers = ThreadSafe::Array.new servers << server server_info = ServerInfo.new(score, servers) # Insert into Array in order of score if index = server_infos.find_index {|si| si.score <= score} server_infos.insert(index, server_info) else server_infos << server_info end server_info end # Remove the host from the registry based # Returns the server instance if it was removed def self.remove_server(registry, key, hostname, port, notify) server = "#{hostname}:#{port}" logger.debug "Remove Service: #{key} => #{server.inspect}" server_info = nil if server_infos = registry[key] server_infos.each do |si| if si.servers.delete(server) server_info = si break end end # Found server if server_info # Cleanup if no more servers in server list server_infos.delete(server_info) if server_info.servers.size == 0 # Cleanup if no more server infos registry.delete(key) if server_infos.size == 0 server_removed(server) if notify end end server_info end # Check doozer for servers matching supplied criteria # Code unused, consider deleting def self.remote_servers_for(service_name, version='*', region='Development') if version != '*' registered_implementers(service_name, version, region).map do |host| service = host['Config']['ServiceAddr'] "#{service['IPAddress']}:#{service['Port']}" end else # Find the highest version of any particular service versions = {} registered_implementers(service_name, version, region).each do |host| service = host['Config']['ServiceAddr'] (versions[version.to_i] ||= []) << "#{service['IPAddress']}:#{service['Port']}" end # Return the servers implementing the highest version number versions.sort.last.last end end end end