lib/redis_failover/client.rb in redis_failover-0.4.0 vs lib/redis_failover/client.rb in redis_failover-0.5.0

- old
+ new

@@ -1,15 +1,13 @@ require 'set' -require 'open-uri' module RedisFailover # Redis failover-aware client. class Client include Util RETRY_WAIT_TIME = 3 - REDIS_ERRORS = Errno.constants.map { |c| Errno.const_get(c) }.freeze REDIS_READ_OPS = Set[ :echo, :exists, :get, :getbit, @@ -65,33 +63,31 @@ # Creates a new failover redis client. # # Options: # - # :host - redis failover server host (required) - # :port - redis failover server port (required) - # :password - optional password for redis nodes - # :namespace - optional namespace for redis nodes - # :logger - optional logger override + # :zkservers - comma-separated zookeeper host:port pairs (required) + # :znode_path - the Znode path override for redis server list (optional) + # :password - password for redis nodes (optional) + # :namespace - namespace for redis nodes (optional) + # :logger - logger override (optional) # :retry_failure - indicate if failures should be retried (default true) # :max_retries - max retries for a failure (default 5) # def initialize(options = {}) - unless options.values_at(:host, :port).all? - raise ArgumentError, ':host and :port options required' - end - Util.logger = options[:logger] if options[:logger] + @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'} + @znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH @namespace = options[:namespace] @password = options[:password] @retry = options[:retry_failure] || true @max_retries = @retry ? options.fetch(:max_retries, 3) : 0 - @server_url = "http://#{options[:host]}:#{options[:port]}/redis_servers" @master = nil @slaves = [] + @lock = Mutex.new + setup_zookeeper_client build_clients - start_background_monitor end def method_missing(method, *args, &block) if redis_operation?(method) dispatch(method, *args, &block) @@ -109,10 +105,34 @@ end alias_method :to_s, :inspect private + def setup_zookeeper_client + @zkclient = ZkClient.new(@zkservers) + + # when session expires / we are disconnected, purge client list + @zkclient.on_session_expiration do + @lock.synchronize { purge_clients } + end + @zkclient.event_handler.register_state_handler(:connecting) do + @lock.synchronize { purge_clients } + end + + # register a watcher for future changes + @zkclient.watcher.register(@znode) do |event| + if event.node_created? || event.node_changed? + build_clients + elsif event.node_deleted? + @zkclient.stat( @znode, :watch => true) + @lock.synchronize { purge_clients } + else + logger.error("Unknown ZK node event: #{event.inspect}") + end + end + end + def redis_operation?(method) Redis.public_instance_methods(false).include?(method) end def dispatch(method, *args, &block) @@ -125,75 +145,75 @@ slave.send(method, *args, &block) else # direct everything else to master master.send(method, *args, &block) end - rescue Error, *REDIS_ERRORS - logger.error("No suitable node available for operation `#{method}.`") - build_clients - + rescue *ALL_ERRORS => ex + logger.error("Error while handling operation `#{method}` - #{ex.message}") if tries < @max_retries tries += 1 + build_clients sleep(RETRY_WAIT_TIME) && retry end raise end end def master - master = @master + master = @lock.synchronize { @master } if master verify_role!(master, :master) return master end raise NoMasterError end def slave # pick a slave, if none available fallback to master - if slave = @slaves.sample + if slave = @lock.synchronize { @slaves.sample } verify_role!(slave, :slave) return slave end master end def build_clients - tries = 0 + @lock.synchronize do + tries = 0 - begin - logger.info('Checking for new redis nodes.') - nodes = fetch_nodes - return unless nodes_changed?(nodes) + begin + nodes = fetch_nodes + return unless nodes_changed?(nodes) - logger.info('Node change detected, rebuilding clients.') - master = new_clients_for(nodes[:master]).first if nodes[:master] - slaves = new_clients_for(*nodes[:slaves]) + purge_clients + logger.info("Building new clients for nodes #{nodes}") + new_master = new_clients_for(nodes[:master]).first if nodes[:master] + new_slaves = new_clients_for(*nodes[:slaves]) + @master = new_master + @slaves = new_slaves + rescue *ALL_ERRORS => ex + purge_clients + logger.error("Failed to fetch nodes from #{@zkservers} - #{ex.message}") + logger.error(ex.backtrace.join("\n")) - # once clients are successfully created, swap the references - @master = master - @slaves = slaves - rescue => ex - logger.error("Failed to fetch nodes from #{@server_url} - #{ex.message}") - logger.error(ex.backtrace.join("\n")) + if tries < @max_retries + tries += 1 + sleep(RETRY_WAIT_TIME) && retry + end - if tries < @max_retries - tries += 1 - sleep(RETRY_WAIT_TIME) && retry + raise end - - raise FailoverServerUnavailableError.new(@server_url) end end def fetch_nodes - open(@server_url) do |io| - nodes = symbolize_keys(MultiJson.decode(io)) - logger.info("Fetched nodes: #{nodes}") - nodes - end + data = @zkclient.get(@znode, :watch => true).first + nodes = symbolize_keys(decode(data)) + logger.debug("Fetched nodes: #{nodes}") + + nodes end def new_clients_for(*nodes) nodes.map do |node| host, port = node.split(':') @@ -241,22 +261,25 @@ return true if address_for(@master) != new_nodes[:master] return true if different?(addresses_for(@slaves), new_nodes[:slaves]) false end - # Spawns a background thread to periodically fetch the latest - # set of nodes from the redis failover server. - def start_background_monitor - Thread.new do - loop do - sleep(10) + def disconnect(*connections) + connections.each do |conn| + if conn begin - build_clients - rescue => ex - logger.error("Failed to poll for new nodes from #{@server_url} - #{ex.message}") - logger.error(ex.backtrace.join("\n")) + conn.client.disconnect + rescue + # best effort end end end + end + + def purge_clients + logger.info("Purging current redis clients") + disconnect(@master, *@slaves) + @master = nil + @slaves = [] end end end