Sha256: 2b76c7c8a291ec77b13f33b0cf52113b0fb2f0f5e0065df9330041af3e950ff8

Contents?: true

Size: 1.97 KB

Versions: 1

Compression:

Stored size: 1.97 KB

Contents

require "thread"

module RedisCluster

  class Client

    def initialize(startup_hosts, global_configs = {})
      @startup_hosts = startup_hosts
      @pool = Pool.new
      @mutex = Mutex.new
      reload_pool_nodes
    end

    def execute(method, args)
      ttl = Configuration::REQUEST_TTL
      asking = false
      try_random_node = false

      while ttl > 0
        ttl -= 1
        begin
          return @pool.execute(method, args, {asking: asking, random_node: try_random_node})
        rescue Errno::ECONNREFUSED, Redis::TimeoutError, Redis::CannotConnectError, Errno::EACCES
          try_random_node = true
          sleep 0.1 if ttl < Configuration::REQUEST_TTL/2
        rescue => e
          err_code = e.to_s.split.first
          raise e unless %w(MOVED ASK).include?(err_code)

          if err_code == "ASK"
             asking = true
          else
            reload_pool_nodes
            sleep 0.1 if ttl < Configuration::REQUEST_TTL/2
          end
        end
      end
    end

    Configuration::SUPPORT_SINGLE_NODE_METHODS.each do |method_name|
      define_method method_name do |*args|
        execute(method_name, args)
      end
    end

    def method_missing(method, *args, &block)
      execute(method, args)
    end

    private

    def reload_pool_nodes
      @mutex.synchronize do
        @startup_hosts.each do |options|
          begin
            redis = Node.redis(options)
            slots_mapping = redis.cluster("slots").group_by{|x| x[2]}
            @pool.delete_except!(slots_mapping.keys)
            slots_mapping.each do |host, infos|
              slots_ranges = infos.map {|x| x[0]..x[1] }
              @pool.add_node!({host: host[0], port: host[1]}, slots_ranges)
            end
          rescue
            next
          end
          break
        end
        fresh_startup_nodes
      end
    end

    def fresh_startup_nodes
      @pool.nodes.each {|node| @startup_hosts.push(node.host_hash) }
      @startup_hosts.uniq!
    end

  end # end client

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
redis_cluster-0.2.3 lib/redis_cluster/client.rb