lib/redis_failover/client.rb in redis_failover-0.8.2 vs lib/redis_failover/client.rb in redis_failover-0.8.3
- old
+ new
@@ -146,10 +146,30 @@
def manual_failover(options = {})
Manual.failover(zk, options)
self
end
+ # Gracefully performs a shutdown of this client. This method is
+ # mostly useful when the client is used in a forking environment.
+ # When a fork occurs, you can call this method in an after_fork hook,
+ # and then create a new instance of the client. The underlying
+ # ZooKeeper client and redis clients will be closed.
+ def shutdown
+ @zk.close! if @zk
+ @zk = nil
+ purge_clients
+ end
+
+ # Reconnect will first perform a shutdown of the underlying redis clients.
+ # Next, it attempts to reopen the ZooKeeper client and re-create the redis
+ # clients after it fetches the most up-to-date list from ZooKeeper.
+ def reconnect
+ purge_clients
+ @zk ? @zk.reopen : setup_zk
+ build_clients
+ end
+
private
# Sets up the underlying ZooKeeper connection.
def setup_zk
@zk = ZK.new(@zkservers)
@@ -189,26 +209,17 @@
# @param [Array] args the arguments to pass to the method
# @param [Proc] block an optional block to pass to the method
# @return [Object] the result of dispatching the command
def dispatch(method, *args, &block)
unless recently_heard_from_node_manager?
- @lock.synchronize do
- reconnect_zk
- build_clients
- end
+ build_clients
end
verify_supported!(method)
tries = 0
begin
- if REDIS_READ_OPS.include?(method)
- # send read operations to a slave
- slave.send(method, *args, &block)
- else
- # direct everything else to master
- master.send(method, *args, &block)
- end
+ client_for(method).send(method, *args, &block)
rescue *CONNECTIVITY_ERRORS => ex
logger.error("Error while handling `#{method}` - #{ex.inspect}")
logger.error(ex.backtrace.join("\n"))
if tries < @max_retries
@@ -216,10 +227,16 @@
build_clients
sleep(RETRY_WAIT_TIME)
retry
end
raise
+ ensure
+ if info = Thread.current[:last_operation_info]
+ if info[:method] == method
+ Thread.current[:last_operation_info] = nil
+ end
+ end
end
end
# Returns the currently known master.
#
@@ -249,32 +266,22 @@
# Builds the Redis clients for the currently known master/slaves.
# The current master/slaves are fetched via ZooKeeper.
def build_clients
@lock.synchronize do
- retried = false
-
begin
nodes = fetch_nodes
return unless nodes_changed?(nodes)
purge_clients
logger.info("Building new clients for nodes #{nodes}")
new_master = new_clients_for(nodes[:master]).first if nodes[:master]
new_slaves = new_clients_for(*nodes[:slaves])
@master = new_master
@slaves = new_slaves
- rescue ZK::Exceptions::InterruptedSession => ex
- logger.error("ZK error while attempting to build clients: #{ex.inspect}")
- logger.error(ex.backtrace.join("\n"))
-
- # when ZK is disconnected, retry once
- unless retried
- reconnect_zk
- retried = true
- retry
- end
+ rescue
+ purge_clients
raise
end
end
end
@@ -285,10 +292,14 @@
data = @zk.get(@znode, :watch => true).first
nodes = symbolize_keys(decode(data))
logger.debug("Fetched nodes: #{nodes}")
nodes
+ rescue Zookeeper::Exceptions::InheritedConnectionError => ex
+ logger.debug { "Caught #{ex.class} '#{ex.message}' reconstructing the zk instance" }
+ @zk.reopen
+ retry
end
# Builds new Redis clients for the specified nodes.
#
# @param [Array<String>] nodes the array of redis host:port pairs
@@ -401,8 +412,36 @@
# @return [Boolean] indicates if we recently heard from the Node Manager
def recently_heard_from_node_manager?
return false unless @last_znode_timestamp
Time.now - @last_znode_timestamp <= ZNODE_UPDATE_TIMEOUT
+ end
+
+ # Returns the client to use for the specified operation.
+ #
+ # @param [Symbol] method the method for which to retrieve a client
+ # @return [Redis] a redis client to use
+ # @note
+ # This method stores the last client/method used to handle the case
+ # where the same RedisFailover::Client instance is referenced by a
+ # block passed to multi.
+ def client_for(method)
+ if info = Thread.current[:last_operation_info]
+ return info[:client]
+ elsif REDIS_READ_OPS.include?(method)
+ # send read operations to a slave
+ Thread.current[:last_operation_info] = {
+ :client => slave,
+ :method => method
+ }
+ else
+ # direct everything else to master
+ Thread.current[:last_operation_info] = {
+ :client => master,
+ :method => method
+ }
+ end
+
+ Thread.current[:last_operation_info][:client]
end
end
end