lib/redis_failover/client.rb in redis_failover-0.8.7 vs lib/redis_failover/client.rb in redis_failover-0.8.8
- old
+ new
@@ -103,11 +103,13 @@
@db = options[:db]
@retry = options[:retry_failure] || true
@max_retries = @retry ? options.fetch(:max_retries, 3) : 0
@master = nil
@slaves = []
+ @node_addresses = {}
@lock = Monitor.new
+ @current_client_key = "current-client-#{self.object_id}"
setup_zk
build_clients
end
# Dispatches redis operations to master/slaves.
@@ -226,15 +228,11 @@
sleep(RETRY_WAIT_TIME)
retry
end
raise
ensure
- if info = Thread.current[:last_operation_info]
- if info[:method] == method
- Thread.current[:last_operation_info] = nil
- end
- end
+ free_client
end
end
# Returns the currently known master.
#
@@ -310,10 +308,11 @@
opts.update(:password => @password) if @password
client = Redis.new(opts)
if @namespace
client = Redis::Namespace.new(@namespace, :redis => client)
end
+ @node_addresses[client] = node
client
end
end
# @return [String] a friendly name for current master
@@ -362,11 +361,11 @@
#
# @param [Redis] node a redis client
# @return [String] the address for the node
def address_for(node)
return unless node
- "#{node.client.host}:#{node.client.port}"
+ @node_addresses[node]
end
# Determines if the currently known redis servers is different
# from the nodes returned by ZooKeeper.
#
@@ -398,10 +397,11 @@
@lock.synchronize do
logger.info("Purging current redis clients")
disconnect(@master, *@slaves)
@master = nil
@slaves = []
+ @node_addresses = {}
end
end
# Updates timestamp when an event is received by the Node Manager.
def update_znode_timestamp
@@ -412,34 +412,29 @@
def recently_heard_from_node_manager?
return false unless @last_znode_timestamp
Time.now - @last_znode_timestamp <= ZNODE_UPDATE_TIMEOUT
end
- # Returns the client to use for the specified operation.
+ # Acquires a client to use for the specified operation.
#
# @param [Symbol] method the method for which to retrieve a client
# @return [Redis] a redis client to use
# @note
- # This method stores the last client/method used to handle the case
- # where the same RedisFailover::Client instance is referenced by a
- # block passed to multi.
+ # This method stores a stack of clients used to handle the case
+ # where the same RedisFailover::Client instance is referenced by
+ # nested blocks (e.g., block passed to multi).
def client_for(method)
- if info = Thread.current[:last_operation_info]
- return info[:client]
- elsif REDIS_READ_OPS.include?(method)
- # send read operations to a slave
- Thread.current[:last_operation_info] = {
- :client => slave,
- :method => method
- }
- else
- # direct everything else to master
- Thread.current[:last_operation_info] = {
- :client => master,
- :method => method
- }
- end
+ stack = Thread.current[@current_client_key] ||= []
+ client = stack.last || (REDIS_READ_OPS.include?(method) ? slave : master)
+ stack << client
+ client
+ end
- Thread.current[:last_operation_info][:client]
+ # Pops a client from the thread-local client stack.
+ def free_client
+ if stack = Thread.current[@current_client_key]
+ stack.pop
+ end
+ nil
end
end
end