lib/redis_cluster/client.rb in redis_cluster-0.3.0 vs lib/redis_cluster/client.rb in redis_cluster-0.3.1

- old
+ new

@@ -2,78 +2,112 @@ module RedisCluster class Client - def initialize(startup_hosts, configs = {}) - @startup_hosts = startup_hosts + def initialize(hosts, configs = {}) + @hosts = hosts.dup + @initial_hosts = hosts.dup # Extract configuration options relevant to Redis Cluster. # force_cluster defaults to true to match the client's behavior before # the option existed @force_cluster = configs.delete(:force_cluster) { |_key| true } - # The number of times to retry a failed execute. Redis errors, `MOVE`, or - # `ASK` are all considered failures that will count towards this tally. A - # count of at least 2 is probably sensible because if a node disappears - # the first try will be a Redis error, the second try retry will probably - # be a `MOVE` (whereupon the node pool is reloaded), and it will take - # until the third try to succeed. - @retry_count = configs.delete(:retry_count) { |_key| 3 } + # An optional logger. Should respond like the standard Ruby `Logger`: + # + # http://ruby-doc.org/stdlib-2.4.0/libdoc/logger/rdoc/Logger.html + @logger = configs.delete(:logger) { |_key| nil } + # The number of times to retry when it detects a failure that looks like + # it might be intermittent. + # + # It might be worth setting this to `0` if you'd like full visibility + # around what kinds of errors are occurring. Possibly in conjunction with + # your own out-of-library retry loop and/or circuit breaker. + @retry_count = configs.delete(:retry_count) { |_key| 2 } + # Any leftover configuration goes through to the pool and onto individual # Redis clients. @pool = Pool.new(configs) @mutex = Mutex.new - reload_pool_nodes(true) + reload_pool_nodes end def execute(method, args, &block) asking = false - last_error = nil - retries_left = @retry_count - try_random_node = false + retried = false - # We use `>= 0` instead of `> 0` because we decrement this counter on the - # first run. - while retries_left >= 0 - retries_left -= 1 - - begin - return @pool.execute(method, args, {asking: asking, random_node: try_random_node}, &block) - - rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES => e - last_error = e - + # Note that there are two levels of retry loops here. + # + # The first is for intermittent failures like "host unreachable" or + # timeouts. These are retried a number of times equal to @retry_count. + # + # The second is when it receives an `ASK` or `MOVED` error response from + # Redis. In this case the client will complete re-enter its execution + # loop and retry the command after any necessary prework (if `MOVED`, it + # will attempt to reload the node pool first). This will only ever be + # retried one time (see notes below). This loop uses Ruby's `retry` + # syntax for blocks, so keep an eye out for that in the code below. + # + # It's worth noting that if these conditions ever combine, you could see + # more network attempts than @retry_count. An initial execution attempt + # might fail intermittently a couple times before sending a `MOVED`. The + # client will then attempt to reload the node pool, an operation which is + # also retried for intermittent failures. It could then return to the + # main execution and fail another couple of times intermittently. This + # should be an extreme edge case, but it's worth considering if you're + # running at large scale. + begin + retry_intermittent_loop do |attempt| # Getting an error while executing may be an indication that we've # lost the node that we were talking to and in that case it makes # sense to try a different node and maybe reload our node pool (if # the new node issues a `MOVE`). - try_random_node = true + try_random_node = attempt > 0 - rescue => e - last_error = e + return @pool.execute(method, args, {asking: asking, random_node: try_random_node}, &block) + end + rescue Redis::CommandError => e + unless @logger.nil? + @logger.error("redis_cluster: Received error: #{e}") + end - err_code = e.to_s.split.first - raise e unless %w(MOVED ASK).include?(err_code) + # This is a special condition to protect against a misbehaving library + # or server. After we've gotten one ASK or MOVED and retried once, + # we'll never do so a second time. Receiving two of any operations in a + # row is probably indicative of a problem and we don't want to get + # stuck in an infinite retry loop. + raise if retried + retried = true - if err_code == 'ASK' - asking = true - else - # `MOVED` indicates a permanent redirect which means that our slot - # mappings are stale: reload them. - reload_pool_nodes(false) + err_code = e.to_s.split.first + case err_code + when 'ASK' + unless @logger.nil? + @logger.info("redis_cluster: Received ASK; retrying operation (#{e})") end + + asking = true + retry + + when 'MOVED' + unless @logger.nil? + @logger.info("redis_cluster: Received MOVED; retrying operation (#{e})") + end + + # `MOVED` indicates a permanent redirect which means that our slot + # mappings are stale: reload them then try what we were doing again + reload_pool_nodes + retry + + else + raise end end - - # If we ran out of retries (the maximum number may have been set to 0), - # surface any error that was thrown back to the caller. We'd otherwise - # suppress the error, which would return something quite unexpected. - raise last_error end Configuration.method_names.each do |method_name| define_method method_name do |*args, &block| execute(method_name, args, &block) @@ -82,17 +116,35 @@ def method_missing(method, *args, &block) execute(method, args, &block) end + # Closes all open connections and reloads the client pool. + # + # Normally host information from the last time the node pool was reloaded + # is used, but if the `use_initial_hosts` is set to `true`, then the client + # is completely refreshed and the hosts that were specified when creating + # it originally are set instead. + def reconnect(options = {}) + use_initial_hosts = options.fetch(:use_initial_hosts, false) + + @hosts = @initial_hosts.dup if use_initial_hosts + + @mutex.synchronize do + @pool.nodes.each{|node| node.connection.close} + @pool.nodes.clear + reload_pool_nodes_unsync + end + end + private # Adds only a single node to the client pool and sets it result for the # entire space of slots. This is useful when running either a standalone # Redis or a single-node Redis Cluster. def create_single_node_pool - host = @startup_hosts + host = @hosts if host.is_a?(Array) if host.length > 1 raise ArgumentError, "Can only create single node pool for single host" end @@ -100,75 +152,110 @@ # client pool. host = host.first end @pool.add_node!(host, [(0..Configuration::HASH_SLOTS)]) + + unless @logger.nil? + @logger.info("redis_cluster: Initialized single node pool: #{host}") + end end - def create_multi_node_pool(raise_error) - unless @startup_hosts.is_a?(Array) + def create_multi_node_pool + unless @hosts.is_a?(Array) raise ArgumentError, "Can only create multi-node pool for multiple hosts" end - @startup_hosts.each do |options| - begin + begin + retry_intermittent_loop do |attempt| + # Try a random host from our seed pool. + options = @hosts.sample + redis = Node.redis(@pool.global_configs.merge(options)) slots_mapping = redis.cluster("slots").group_by{|x| x[2]} @pool.delete_except!(slots_mapping.keys) slots_mapping.each do |host, infos| slots_ranges = infos.map {|x| x[0]..x[1] } @pool.add_node!({host: host[0], port: host[1]}, slots_ranges) end - rescue Redis::CommandError => e - if e.message =~ /cluster\ support\ disabled$/ - if !@force_cluster - # We're running outside of cluster-mode -- just create a - # single-node pool and move on. The exception is if we've been - # asked for force Redis Cluster, in which case we assume this is - # a configuration problem and maybe raise an error. - create_single_node_pool - return - elsif raise_error - raise e - end - end + end + rescue Redis::CommandError => e + unless @logger.nil? + @logger.error("redis_cluster: Received error: #{e}") + end - raise e if e.message =~ /NOAUTH\ Authentication\ required/ - - # TODO: log error for visibility - next - rescue - # TODO: log error for visibility - next + if e.message =~ /cluster\ support\ disabled$/ && !@force_cluster + # We're running outside of cluster-mode -- just create a single-node + # pool and move on. The exception is if we've been asked for force + # Redis Cluster, in which case we assume this is a configuration + # problem and maybe raise an error. + create_single_node_pool + return end - # We only need to see a `CLUSTER SLOTS` result from a single host, so - # break after one success. - break + raise end + + unless @logger.nil? + mappings = @pool.nodes.map{|node| "#{node.slots} -> #{node.options}"} + @logger.info("redis_cluster: Initialized multi-node pool: #{mappings}") + end end # Reloads the client node pool by requesting new information with `CLUSTER # SLOTS` or just adding a node directly if running on standalone. Clients # are "upserted" so that we don't necessarily drop clients that are still # relevant. - def reload_pool_nodes(raise_error) + def reload_pool_nodes @mutex.synchronize do - if @startup_hosts.is_a?(Array) - create_multi_node_pool(raise_error) - refresh_startup_nodes - else - create_single_node_pool - end + reload_pool_nodes_unsync end end - # Refreshes the contents of @startup_hosts based on the hosts currently in + # The same as `#reload_pool_nodes`, but doesn't attempt to synchronize on + # the mutex. Use this only if you've already got a lock on it. + def reload_pool_nodes_unsync + if @hosts.is_a?(Array) + create_multi_node_pool + refresh_startup_nodes + else + create_single_node_pool + end + end + + # Refreshes the contents of @hosts based on the hosts currently in # the client pool. This is useful because we may have been told about new # hosts after running `CLUSTER SLOTS`. def refresh_startup_nodes - @pool.nodes.each {|node| @startup_hosts.push(node.host_hash) } - @startup_hosts.uniq! + @pool.nodes.each {|node| @hosts.push(node.host_hash) } + @hosts.uniq! + end + + # Retries an operation @retry_count times for intermittent connection + # errors. After exhausting retries, the error that was received on the last + # attempt is raised to the user. + def retry_intermittent_loop + last_error = nil + + for attempt in 0..(@retry_count) do + begin + yield(attempt) + + # Fall through on any success. + return + rescue Errno::EACCES, Redis::TimeoutError, Redis::CannotConnectError => e + last_error = e + + unless @logger.nil? + @logger.error("redis_cluster: Received error: #{e} retries_left=#{@retry_count - attempt}") + end + end + end + + # If we ran out of retries (the maximum number may have been set to 0), + # surface any error that was thrown back to the caller. We'd otherwise + # suppress the error, which would return something quite unexpected. + raise last_error end end # end client end