lib/redis_failover/client.rb in redis_failover-0.8.2 vs lib/redis_failover/client.rb in redis_failover-0.8.3

- old
+ new

@@ -146,10 +146,30 @@ def manual_failover(options = {}) Manual.failover(zk, options) self end + # Gracefully performs a shutdown of this client. This method is + # mostly useful when the client is used in a forking environment. + # When a fork occurs, you can call this method in an after_fork hook, + # and then create a new instance of the client. The underlying + # ZooKeeper client and redis clients will be closed. + def shutdown + @zk.close! if @zk + @zk = nil + purge_clients + end + + # Reconnect will first perform a shutdown of the underlying redis clients. + # Next, it attempts to reopen the ZooKeeper client and re-create the redis + # clients after it fetches the most up-to-date list from ZooKeeper. + def reconnect + purge_clients + @zk ? @zk.reopen : setup_zk + build_clients + end + private # Sets up the underlying ZooKeeper connection. def setup_zk @zk = ZK.new(@zkservers) @@ -189,26 +209,17 @@ # @param [Array] args the arguments to pass to the method # @param [Proc] block an optional block to pass to the method # @return [Object] the result of dispatching the command def dispatch(method, *args, &block) unless recently_heard_from_node_manager? - @lock.synchronize do - reconnect_zk - build_clients - end + build_clients end verify_supported!(method) tries = 0 begin - if REDIS_READ_OPS.include?(method) - # send read operations to a slave - slave.send(method, *args, &block) - else - # direct everything else to master - master.send(method, *args, &block) - end + client_for(method).send(method, *args, &block) rescue *CONNECTIVITY_ERRORS => ex logger.error("Error while handling `#{method}` - #{ex.inspect}") logger.error(ex.backtrace.join("\n")) if tries < @max_retries @@ -216,10 +227,16 @@ build_clients sleep(RETRY_WAIT_TIME) retry end raise + ensure + if info = Thread.current[:last_operation_info] + if info[:method] == method + Thread.current[:last_operation_info] = nil + end + end end end # Returns the currently known master. # @@ -249,32 +266,22 @@ # Builds the Redis clients for the currently known master/slaves. # The current master/slaves are fetched via ZooKeeper. def build_clients @lock.synchronize do - retried = false - begin nodes = fetch_nodes return unless nodes_changed?(nodes) purge_clients logger.info("Building new clients for nodes #{nodes}") new_master = new_clients_for(nodes[:master]).first if nodes[:master] new_slaves = new_clients_for(*nodes[:slaves]) @master = new_master @slaves = new_slaves - rescue ZK::Exceptions::InterruptedSession => ex - logger.error("ZK error while attempting to build clients: #{ex.inspect}") - logger.error(ex.backtrace.join("\n")) - - # when ZK is disconnected, retry once - unless retried - reconnect_zk - retried = true - retry - end + rescue + purge_clients raise end end end @@ -285,10 +292,14 @@ data = @zk.get(@znode, :watch => true).first nodes = symbolize_keys(decode(data)) logger.debug("Fetched nodes: #{nodes}") nodes + rescue Zookeeper::Exceptions::InheritedConnectionError => ex + logger.debug { "Caught #{ex.class} '#{ex.message}' reconstructing the zk instance" } + @zk.reopen + retry end # Builds new Redis clients for the specified nodes. # # @param [Array<String>] nodes the array of redis host:port pairs @@ -401,8 +412,36 @@ # @return [Boolean] indicates if we recently heard from the Node Manager def recently_heard_from_node_manager? return false unless @last_znode_timestamp Time.now - @last_znode_timestamp <= ZNODE_UPDATE_TIMEOUT + end + + # Returns the client to use for the specified operation. + # + # @param [Symbol] method the method for which to retrieve a client + # @return [Redis] a redis client to use + # @note + # This method stores the last client/method used to handle the case + # where the same RedisFailover::Client instance is referenced by a + # block passed to multi. + def client_for(method) + if info = Thread.current[:last_operation_info] + return info[:client] + elsif REDIS_READ_OPS.include?(method) + # send read operations to a slave + Thread.current[:last_operation_info] = { + :client => slave, + :method => method + } + else + # direct everything else to master + Thread.current[:last_operation_info] = { + :client => master, + :method => method + } + end + + Thread.current[:last_operation_info][:client] end end end