lib/redis_cluster/client.rb in redis_cluster-0.2.9 vs lib/redis_cluster/client.rb in redis_cluster-0.3.0
- old
+ new
@@ -2,44 +2,81 @@
module RedisCluster
class Client
- def initialize(startup_hosts, global_configs = {})
+ def initialize(startup_hosts, configs = {})
@startup_hosts = startup_hosts
- @pool = Pool.new(global_configs)
+
+ # Extract configuration options relevant to Redis Cluster.
+
+ # force_cluster defaults to true to match the client's behavior before
+ # the option existed
+ @force_cluster = configs.delete(:force_cluster) { |_key| true }
+
+ # The number of times to retry a failed execute. Redis errors, `MOVE`, or
+ # `ASK` are all considered failures that will count towards this tally. A
+ # count of at least 2 is probably sensible because if a node disappears
+ # the first try will be a Redis error, the second try retry will probably
+ # be a `MOVE` (whereupon the node pool is reloaded), and it will take
+ # until the third try to succeed.
+ @retry_count = configs.delete(:retry_count) { |_key| 3 }
+
+ # Any leftover configuration goes through to the pool and onto individual
+ # Redis clients.
+ @pool = Pool.new(configs)
@mutex = Mutex.new
+
reload_pool_nodes(true)
end
def execute(method, args, &block)
- ttl = Configuration::REQUEST_TTL
asking = false
+ last_error = nil
+ retries_left = @retry_count
try_random_node = false
- while ttl > 0
- ttl -= 1
+ # We use `>= 0` instead of `> 0` because we decrement this counter on the
+ # first run.
+ while retries_left >= 0
+ retries_left -= 1
+
begin
return @pool.execute(method, args, {asking: asking, random_node: try_random_node}, &block)
- rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES
+
+ rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES => e
+ last_error = e
+
+ # Getting an error while executing may be an indication that we've
+ # lost the node that we were talking to and in that case it makes
+ # sense to try a different node and maybe reload our node pool (if
+ # the new node issues a `MOVE`).
try_random_node = true
- sleep 0.1 if ttl < Configuration::REQUEST_TTL/2
+
rescue => e
+ last_error = e
+
err_code = e.to_s.split.first
raise e unless %w(MOVED ASK).include?(err_code)
if err_code == 'ASK'
asking = true
else
- reload_pool_nodes
- sleep 0.1 if ttl < Configuration::REQUEST_TTL/2
+ # `MOVED` indicates a permanent redirect which means that our slot
+ # mappings are stale: reload them.
+ reload_pool_nodes(false)
end
end
end
+
+ # If we ran out of retries (the maximum number may have been set to 0),
+ # surface any error that was thrown back to the caller. We'd otherwise
+ # suppress the error, which would return something quite unexpected.
+ raise last_error
end
- Configuration::SUPPORT_SINGLE_NODE_METHODS.each do |method_name|
+ Configuration.method_names.each do |method_name|
define_method method_name do |*args, &block|
execute(method_name, args, &block)
end
end
@@ -47,36 +84,89 @@
execute(method, args, &block)
end
private
- def reload_pool_nodes(raise_error = false)
- return @pool.add_node!(@startup_hosts, [(0..Configuration::HASH_SLOTS)]) unless @startup_hosts.is_a? Array
+ # Adds only a single node to the client pool and sets it result for the
+ # entire space of slots. This is useful when running either a standalone
+ # Redis or a single-node Redis Cluster.
+ def create_single_node_pool
+ host = @startup_hosts
+ if host.is_a?(Array)
+ if host.length > 1
+ raise ArgumentError, "Can only create single node pool for single host"
+ end
- @mutex.synchronize do
- @startup_hosts.each do |options|
- begin
- redis = Node.redis(@pool.global_configs.merge(options))
- slots_mapping = redis.cluster("slots").group_by{|x| x[2]}
- @pool.delete_except!(slots_mapping.keys)
- slots_mapping.each do |host, infos|
- slots_ranges = infos.map {|x| x[0]..x[1] }
- @pool.add_node!({host: host[0], port: host[1]}, slots_ranges)
+ # Flatten the configured host so that we can easily add it to the
+ # client pool.
+ host = host.first
+ end
+
+ @pool.add_node!(host, [(0..Configuration::HASH_SLOTS)])
+ end
+
+ def create_multi_node_pool(raise_error)
+ unless @startup_hosts.is_a?(Array)
+ raise ArgumentError, "Can only create multi-node pool for multiple hosts"
+ end
+
+ @startup_hosts.each do |options|
+ begin
+ redis = Node.redis(@pool.global_configs.merge(options))
+ slots_mapping = redis.cluster("slots").group_by{|x| x[2]}
+ @pool.delete_except!(slots_mapping.keys)
+ slots_mapping.each do |host, infos|
+ slots_ranges = infos.map {|x| x[0]..x[1] }
+ @pool.add_node!({host: host[0], port: host[1]}, slots_ranges)
+ end
+ rescue Redis::CommandError => e
+ if e.message =~ /cluster\ support\ disabled$/
+ if !@force_cluster
+ # We're running outside of cluster-mode -- just create a
+ # single-node pool and move on. The exception is if we've been
+ # asked for force Redis Cluster, in which case we assume this is
+ # a configuration problem and maybe raise an error.
+ create_single_node_pool
+ return
+ elsif raise_error
+ raise e
end
- rescue Redis::CommandError => e
- raise e if raise_error && e.message =~ /cluster\ support\ disabled$/
- raise e if e.message =~ /NOAUTH\ Authentication\ required/
- next
- rescue
- next
end
- break
+
+ raise e if e.message =~ /NOAUTH\ Authentication\ required/
+
+ # TODO: log error for visibility
+ next
+ rescue
+ # TODO: log error for visibility
+ next
end
- fresh_startup_nodes
+
+ # We only need to see a `CLUSTER SLOTS` result from a single host, so
+ # break after one success.
+ break
end
end
- def fresh_startup_nodes
+ # Reloads the client node pool by requesting new information with `CLUSTER
+ # SLOTS` or just adding a node directly if running on standalone. Clients
+ # are "upserted" so that we don't necessarily drop clients that are still
+ # relevant.
+ def reload_pool_nodes(raise_error)
+ @mutex.synchronize do
+ if @startup_hosts.is_a?(Array)
+ create_multi_node_pool(raise_error)
+ refresh_startup_nodes
+ else
+ create_single_node_pool
+ end
+ end
+ end
+
+ # Refreshes the contents of @startup_hosts based on the hosts currently in
+ # the client pool. This is useful because we may have been told about new
+ # hosts after running `CLUSTER SLOTS`.
+ def refresh_startup_nodes
@pool.nodes.each {|node| @startup_hosts.push(node.host_hash) }
@startup_hosts.uniq!
end
end # end client