lib/redis_cluster/client.rb in redis_cluster-0.3.0 vs lib/redis_cluster/client.rb in redis_cluster-0.3.1
- old
+ new
@@ -2,78 +2,112 @@
module RedisCluster
class Client
- def initialize(startup_hosts, configs = {})
- @startup_hosts = startup_hosts
+ def initialize(hosts, configs = {})
+ @hosts = hosts.dup
+ @initial_hosts = hosts.dup
# Extract configuration options relevant to Redis Cluster.
# force_cluster defaults to true to match the client's behavior before
# the option existed
@force_cluster = configs.delete(:force_cluster) { |_key| true }
- # The number of times to retry a failed execute. Redis errors, `MOVE`, or
- # `ASK` are all considered failures that will count towards this tally. A
- # count of at least 2 is probably sensible because if a node disappears
- # the first try will be a Redis error, the second try retry will probably
- # be a `MOVE` (whereupon the node pool is reloaded), and it will take
- # until the third try to succeed.
- @retry_count = configs.delete(:retry_count) { |_key| 3 }
+ # An optional logger. Should respond like the standard Ruby `Logger`:
+ #
+ # http://ruby-doc.org/stdlib-2.4.0/libdoc/logger/rdoc/Logger.html
+ @logger = configs.delete(:logger) { |_key| nil }
+ # The number of times to retry when it detects a failure that looks like
+ # it might be intermittent.
+ #
+ # It might be worth setting this to `0` if you'd like full visibility
+ # around what kinds of errors are occurring. Possibly in conjunction with
+ # your own out-of-library retry loop and/or circuit breaker.
+ @retry_count = configs.delete(:retry_count) { |_key| 2 }
+
# Any leftover configuration goes through to the pool and onto individual
# Redis clients.
@pool = Pool.new(configs)
@mutex = Mutex.new
- reload_pool_nodes(true)
+ reload_pool_nodes
end
def execute(method, args, &block)
asking = false
- last_error = nil
- retries_left = @retry_count
- try_random_node = false
+ retried = false
- # We use `>= 0` instead of `> 0` because we decrement this counter on the
- # first run.
- while retries_left >= 0
- retries_left -= 1
-
- begin
- return @pool.execute(method, args, {asking: asking, random_node: try_random_node}, &block)
-
- rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES => e
- last_error = e
-
+ # Note that there are two levels of retry loops here.
+ #
+ # The first is for intermittent failures like "host unreachable" or
+ # timeouts. These are retried a number of times equal to @retry_count.
+ #
+ # The second is when it receives an `ASK` or `MOVED` error response from
+ # Redis. In this case the client will complete re-enter its execution
+ # loop and retry the command after any necessary prework (if `MOVED`, it
+ # will attempt to reload the node pool first). This will only ever be
+ # retried one time (see notes below). This loop uses Ruby's `retry`
+ # syntax for blocks, so keep an eye out for that in the code below.
+ #
+ # It's worth noting that if these conditions ever combine, you could see
+ # more network attempts than @retry_count. An initial execution attempt
+ # might fail intermittently a couple times before sending a `MOVED`. The
+ # client will then attempt to reload the node pool, an operation which is
+ # also retried for intermittent failures. It could then return to the
+ # main execution and fail another couple of times intermittently. This
+ # should be an extreme edge case, but it's worth considering if you're
+ # running at large scale.
+ begin
+ retry_intermittent_loop do |attempt|
# Getting an error while executing may be an indication that we've
# lost the node that we were talking to and in that case it makes
# sense to try a different node and maybe reload our node pool (if
# the new node issues a `MOVE`).
- try_random_node = true
+ try_random_node = attempt > 0
- rescue => e
- last_error = e
+ return @pool.execute(method, args, {asking: asking, random_node: try_random_node}, &block)
+ end
+ rescue Redis::CommandError => e
+ unless @logger.nil?
+ @logger.error("redis_cluster: Received error: #{e}")
+ end
- err_code = e.to_s.split.first
- raise e unless %w(MOVED ASK).include?(err_code)
+ # This is a special condition to protect against a misbehaving library
+ # or server. After we've gotten one ASK or MOVED and retried once,
+ # we'll never do so a second time. Receiving two of any operations in a
+ # row is probably indicative of a problem and we don't want to get
+ # stuck in an infinite retry loop.
+ raise if retried
+ retried = true
- if err_code == 'ASK'
- asking = true
- else
- # `MOVED` indicates a permanent redirect which means that our slot
- # mappings are stale: reload them.
- reload_pool_nodes(false)
+ err_code = e.to_s.split.first
+ case err_code
+ when 'ASK'
+ unless @logger.nil?
+ @logger.info("redis_cluster: Received ASK; retrying operation (#{e})")
end
+
+ asking = true
+ retry
+
+ when 'MOVED'
+ unless @logger.nil?
+ @logger.info("redis_cluster: Received MOVED; retrying operation (#{e})")
+ end
+
+ # `MOVED` indicates a permanent redirect which means that our slot
+ # mappings are stale: reload them then try what we were doing again
+ reload_pool_nodes
+ retry
+
+ else
+ raise
end
end
-
- # If we ran out of retries (the maximum number may have been set to 0),
- # surface any error that was thrown back to the caller. We'd otherwise
- # suppress the error, which would return something quite unexpected.
- raise last_error
end
Configuration.method_names.each do |method_name|
define_method method_name do |*args, &block|
execute(method_name, args, &block)
@@ -82,17 +116,35 @@
def method_missing(method, *args, &block)
execute(method, args, &block)
end
+ # Closes all open connections and reloads the client pool.
+ #
+ # Normally host information from the last time the node pool was reloaded
+ # is used, but if the `use_initial_hosts` is set to `true`, then the client
+ # is completely refreshed and the hosts that were specified when creating
+ # it originally are set instead.
+ def reconnect(options = {})
+ use_initial_hosts = options.fetch(:use_initial_hosts, false)
+
+ @hosts = @initial_hosts.dup if use_initial_hosts
+
+ @mutex.synchronize do
+ @pool.nodes.each{|node| node.connection.close}
+ @pool.nodes.clear
+ reload_pool_nodes_unsync
+ end
+ end
+
private
# Adds only a single node to the client pool and sets it result for the
# entire space of slots. This is useful when running either a standalone
# Redis or a single-node Redis Cluster.
def create_single_node_pool
- host = @startup_hosts
+ host = @hosts
if host.is_a?(Array)
if host.length > 1
raise ArgumentError, "Can only create single node pool for single host"
end
@@ -100,75 +152,110 @@
# client pool.
host = host.first
end
@pool.add_node!(host, [(0..Configuration::HASH_SLOTS)])
+
+ unless @logger.nil?
+ @logger.info("redis_cluster: Initialized single node pool: #{host}")
+ end
end
- def create_multi_node_pool(raise_error)
- unless @startup_hosts.is_a?(Array)
+ def create_multi_node_pool
+ unless @hosts.is_a?(Array)
raise ArgumentError, "Can only create multi-node pool for multiple hosts"
end
- @startup_hosts.each do |options|
- begin
+ begin
+ retry_intermittent_loop do |attempt|
+ # Try a random host from our seed pool.
+ options = @hosts.sample
+
redis = Node.redis(@pool.global_configs.merge(options))
slots_mapping = redis.cluster("slots").group_by{|x| x[2]}
@pool.delete_except!(slots_mapping.keys)
slots_mapping.each do |host, infos|
slots_ranges = infos.map {|x| x[0]..x[1] }
@pool.add_node!({host: host[0], port: host[1]}, slots_ranges)
end
- rescue Redis::CommandError => e
- if e.message =~ /cluster\ support\ disabled$/
- if !@force_cluster
- # We're running outside of cluster-mode -- just create a
- # single-node pool and move on. The exception is if we've been
- # asked for force Redis Cluster, in which case we assume this is
- # a configuration problem and maybe raise an error.
- create_single_node_pool
- return
- elsif raise_error
- raise e
- end
- end
+ end
+ rescue Redis::CommandError => e
+ unless @logger.nil?
+ @logger.error("redis_cluster: Received error: #{e}")
+ end
- raise e if e.message =~ /NOAUTH\ Authentication\ required/
-
- # TODO: log error for visibility
- next
- rescue
- # TODO: log error for visibility
- next
+ if e.message =~ /cluster\ support\ disabled$/ && !@force_cluster
+ # We're running outside of cluster-mode -- just create a single-node
+ # pool and move on. The exception is if we've been asked for force
+ # Redis Cluster, in which case we assume this is a configuration
+ # problem and maybe raise an error.
+ create_single_node_pool
+ return
end
- # We only need to see a `CLUSTER SLOTS` result from a single host, so
- # break after one success.
- break
+ raise
end
+
+ unless @logger.nil?
+ mappings = @pool.nodes.map{|node| "#{node.slots} -> #{node.options}"}
+ @logger.info("redis_cluster: Initialized multi-node pool: #{mappings}")
+ end
end
# Reloads the client node pool by requesting new information with `CLUSTER
# SLOTS` or just adding a node directly if running on standalone. Clients
# are "upserted" so that we don't necessarily drop clients that are still
# relevant.
- def reload_pool_nodes(raise_error)
+ def reload_pool_nodes
@mutex.synchronize do
- if @startup_hosts.is_a?(Array)
- create_multi_node_pool(raise_error)
- refresh_startup_nodes
- else
- create_single_node_pool
- end
+ reload_pool_nodes_unsync
end
end
- # Refreshes the contents of @startup_hosts based on the hosts currently in
+ # The same as `#reload_pool_nodes`, but doesn't attempt to synchronize on
+ # the mutex. Use this only if you've already got a lock on it.
+ def reload_pool_nodes_unsync
+ if @hosts.is_a?(Array)
+ create_multi_node_pool
+ refresh_startup_nodes
+ else
+ create_single_node_pool
+ end
+ end
+
+ # Refreshes the contents of @hosts based on the hosts currently in
# the client pool. This is useful because we may have been told about new
# hosts after running `CLUSTER SLOTS`.
def refresh_startup_nodes
- @pool.nodes.each {|node| @startup_hosts.push(node.host_hash) }
- @startup_hosts.uniq!
+ @pool.nodes.each {|node| @hosts.push(node.host_hash) }
+ @hosts.uniq!
+ end
+
+ # Retries an operation @retry_count times for intermittent connection
+ # errors. After exhausting retries, the error that was received on the last
+ # attempt is raised to the user.
+ def retry_intermittent_loop
+ last_error = nil
+
+ for attempt in 0..(@retry_count) do
+ begin
+ yield(attempt)
+
+ # Fall through on any success.
+ return
+ rescue Errno::EACCES, Redis::TimeoutError, Redis::CannotConnectError => e
+ last_error = e
+
+ unless @logger.nil?
+ @logger.error("redis_cluster: Received error: #{e} retries_left=#{@retry_count - attempt}")
+ end
+ end
+ end
+
+ # If we ran out of retries (the maximum number may have been set to 0),
+ # surface any error that was thrown back to the caller. We'd otherwise
+ # suppress the error, which would return something quite unexpected.
+ raise last_error
end
end # end client
end