lib/redis_failover/client.rb in nogara-redis_failover-0.8.9 vs lib/redis_failover/client.rb in nogara-redis_failover-0.8.10

- old
+ new

@@ -25,56 +25,10 @@ ZNODE_UPDATE_TIMEOUT = 9 # Amount of time to sleep before retrying a failed operation. RETRY_WAIT_TIME = 3 - # Redis read operations that are automatically dispatched to slaves. Any - # operation not listed here will be dispatched to the master. - REDIS_READ_OPS = Set[ - :echo, - :exists, - :get, - :getbit, - :getrange, - :hexists, - :hget, - :hgetall, - :hkeys, - :hlen, - :hmget, - :hvals, - :keys, - :lindex, - :llen, - :lrange, - :mapped_hmget, - :mapped_mget, - :mget, - :scard, - :sdiff, - :sinter, - :sismember, - :smembers, - :srandmember, - :strlen, - :sunion, - :type, - :zcard, - :zcount, - :zrange, - :zrangebyscore, - :zrank, - :zrevrange, - :zrevrangebyscore, - :zrevrank, - :zscore - ].freeze - - # Unsupported Redis operations. These don't make sense in a client - # that abstracts the master/slave servers. - UNSUPPORTED_OPS = Set[:select, :dbsize].freeze - # Performance optimization: to avoid unnecessary method_missing calls, # we proactively define methods that dispatch to the underlying redis # calls. Redis.public_instance_methods(false).each do |method| define_method(method) do |*args, &block| @@ -91,29 +45,41 @@ # @option options [String] :db database to use for redis nodes # @option options [String] :namespace namespace for redis nodes # @option options [Logger] :logger logger override # @option options [Boolean] :retry_failure indicates if failures are retried # @option options [Integer] :max_retries max retries for a failure + # @option options [Boolean] :safe_mode indicates if safe mode is used or not + # @option options [Boolean] :master_only indicates if only redis master is used # @return [RedisFailover::Client] def initialize(options = {}) Util.logger = options[:logger] if options[:logger] - @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'} - @znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH - @namespace = options[:namespace] - @password = options[:password] - @db = options[:db] - @retry = options[:retry_failure] || true - @max_retries = @retry ? options.fetch(:max_retries, 3) : 0 @master = nil @slaves = [] @node_addresses = {} @lock = Monitor.new @current_client_key = "current-client-#{self.object_id}" + yield self if block_given? + + parse_options(options) setup_zk build_clients end + # Specifies a callback to invoke when the current redis node list changes. + # + # @param [Proc] a callback with current master and slaves as arguments + # + # @example Usage + # RedisFailover::Client.new(:zkservers => zk_servers) do |client| + # client.on_node_change do |master, slaves| + # logger.info("Nodes changed! master: #{master}, slaves: #{slaves}") + # end + # end + def on_node_change(&callback) + @on_node_change = callback + end + # Dispatches redis operations to master/slaves. def method_missing(method, *args, &block) if redis_operation?(method) dispatch(method, *args, &block) else @@ -166,17 +132,35 @@ purge_clients @zk ? @zk.reopen : setup_zk build_clients end + # Retrieves the current redis master. + # + # @return [String] the host/port of the current master + def current_master + master = @lock.synchronize { @master } + address_for(master) + end + + # Retrieves the current redis slaves. + # + # @return [Array<String>] an array of known slave host/port addresses + def current_slaves + slaves = @lock.synchronize { @slaves } + addresses_for(slaves) + end + private # Sets up the underlying ZooKeeper connection. def setup_zk @zk = ZK.new(@zkservers) @zk.watcher.register(@znode) { |event| handle_zk_event(event) } - @zk.on_expired_session { purge_clients } + if @safe_mode + @zk.on_expired_session { purge_clients } + end @zk.on_connected { @zk.stat(@znode, :watch => true) } @zk.stat(@znode, :watch => true) update_znode_timestamp end @@ -208,11 +192,11 @@ # @param [Symbol] method the method to dispatch # @param [Array] args the arguments to pass to the method # @param [Proc] block an optional block to pass to the method # @return [Object] the result of dispatching the command def dispatch(method, *args, &block) - unless recently_heard_from_node_manager? + if @safe_mode && !recently_heard_from_node_manager? build_clients end verify_supported!(method) tries = 0 @@ -275,14 +259,30 @@ @master = new_master @slaves = new_slaves rescue purge_clients raise + ensure + if should_notify? + @on_node_change.call(current_master, current_slaves) + @last_notified_master = current_master + @last_notified_slaves = current_slaves + end end end end + # Determines if the on_node_change callback should be invoked. + # + # @return [Boolean] true if callback should be invoked, false otherwise + def should_notify? + return false unless @on_node_change + return true if @last_notified_master != current_master + return true if different?(Array(@last_notified_slaves), current_slaves) + false + end + # Fetches the known redis nodes from ZooKeeper. # # @return [Hash] the known master/slave redis servers def fetch_nodes data = @zk.get(@znode, :watch => true).first @@ -421,21 +421,44 @@ # @note # This method stores a stack of clients used to handle the case # where the same RedisFailover::Client instance is referenced by # nested blocks (e.g., block passed to multi). def client_for(method) - # stack = Thread.current[@current_client_key] ||= [] - # client = stack.last || (REDIS_READ_OPS.include?(method) ? slave : master) - # stack << client - # client - master + stack = Thread.current[@current_client_key] ||= [] + client = if stack.last + stack.last + elsif @master_only + master + elsif REDIS_READ_OPS.include?(method) + slave + else + master + end + + stack << client + client end # Pops a client from the thread-local client stack. def free_client if stack = Thread.current[@current_client_key] stack.pop end nil + end + + # Parses the configuration operations. + # + # @param [Hash] options the configuration options + def parse_options(options) + @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'} + @znode = options.fetch(:znode_path, Util::DEFAULT_ZNODE_PATH) + @namespace = options[:namespace] + @password = options[:password] + @db = options[:db] + @retry = options.fetch(:retry_failure, true) + @max_retries = @retry ? options.fetch(:max_retries, 3) : 0 + @safe_mode = options.fetch(:safe_mode, true) + @master_only = options.fetch(:master_only, false) end end end