lib/redis_failover/client.rb in redis_failover-0.2.0 vs lib/redis_failover/client.rb in redis_failover-0.3.0

- old
+ new

@@ -84,15 +84,15 @@ @namespace = options[:namespace] @password = options[:password] @retry = options[:retry_failure] || true @max_retries = @retry ? options.fetch(:max_retries, 3) : 0 @server_url = "http://#{options[:host]}:#{options[:port]}/redis_servers" - @redis_servers = nil @master = nil @slaves = [] @lock = Mutex.new build_clients + start_background_monitor end def method_missing(method, *args, &block) if redis_operation?(method) dispatch(method, *args, &block) @@ -104,11 +104,11 @@ def respond_to?(method) redis_operation?(method) || super end def inspect - "#<RedisFailover::Client - master: #{master_info}, slaves: #{slaves_info})>" + "#<RedisFailover::Client - master: #{master_name}, slaves: #{slave_names})>" end alias_method :to_s, :inspect private @@ -139,13 +139,14 @@ raise end end def master - if @master - verify_role!(@master, :master) - return @master + master = @master + if master + verify_role!(master, :master) + return master end raise NoMasterError end def slave @@ -160,20 +161,23 @@ def build_clients @lock.synchronize do tries = 0 begin - logger.info('Attempting to fetch nodes and build redis clients.') - servers = fetch_redis_servers - master = new_clients_for(servers[:master]).first if servers[:master] - slaves = new_clients_for(*servers[:slaves]) + logger.info('Checking for new redis nodes.') + nodes = fetch_nodes + return unless nodes_changed?(nodes) + logger.info('Node change detected, rebuilding clients.') + master = new_clients_for(nodes[:master]).first if nodes[:master] + slaves = new_clients_for(*nodes[:slaves]) + # once clients are successfully created, swap the references @master = master @slaves = slaves rescue => ex - logger.error("Failed to fetch servers from #{@server_url} - #{ex.message}") + logger.error("Failed to fetch nodes from #{@server_url} - #{ex.message}") logger.error(ex.backtrace.join("\n")) if tries < @max_retries tries += 1 sleep(RETRY_WAIT_TIME) && retry @@ -182,15 +186,15 @@ raise FailoverServerUnreachableError.new(@server_url) end end end - def fetch_redis_servers + def fetch_nodes open(@server_url) do |io| - servers = symbolize_keys(MultiJson.decode(io)) - logger.info("Fetched servers: #{servers}") - servers + nodes = symbolize_keys(MultiJson.decode(io)) + logger.info("Fetched nodes: #{nodes}") + nodes end end def new_clients_for(*nodes) nodes.map do |node| @@ -201,28 +205,54 @@ end client end end - def master_info - return "none" unless @master - name_for(@master) + def master_name + address_for(@master) || 'none' end - def slaves_info - return "none" if @slaves.empty? - @slaves.map { |slave| name_for(slave) }.join(', ') + def slave_names + return 'none' if @slaves.empty? + addresses_for(@slaves).join(', ') end def verify_role!(node, role) current_role = node.info['role'] if current_role.to_sym != role - raise InvalidNodeRoleError.new(name_for(node), role, current_role) + raise InvalidNodeRoleError.new(address_for(node), role, current_role) end role end - def name_for(node) + def addresses_for(nodes) + nodes.map { |node| address_for(node) } + end + + def address_for(node) + return unless node "#{node.client.host}:#{node.client.port}" + end + + def nodes_changed?(new_nodes) + return true if address_for(@master) != new_nodes[:master] + return true if different?(addresses_for(@slaves), new_nodes[:slaves]) + false + end + + # Spawns a background thread to periodically fetch the latest + # set of nodes from the redis failover server. + def start_background_monitor + Thread.new do + loop do + sleep(10) + begin + build_clients + rescue => ex + logger.error("Failed to poll for new nodes from #{@server_url} - #{ex.message}") + logger.error(ex.backtrace.join("\n")) + end + end + end end end end