lib/redis_failover/client.rb in redis_failover-0.8.9 vs lib/redis_failover/client.rb in redis_failover-0.9.0
- old
+ new
@@ -25,56 +25,10 @@
ZNODE_UPDATE_TIMEOUT = 9
# Amount of time to sleep before retrying a failed operation.
RETRY_WAIT_TIME = 3
- # Redis read operations that are automatically dispatched to slaves. Any
- # operation not listed here will be dispatched to the master.
- REDIS_READ_OPS = Set[
- :echo,
- :exists,
- :get,
- :getbit,
- :getrange,
- :hexists,
- :hget,
- :hgetall,
- :hkeys,
- :hlen,
- :hmget,
- :hvals,
- :keys,
- :lindex,
- :llen,
- :lrange,
- :mapped_hmget,
- :mapped_mget,
- :mget,
- :scard,
- :sdiff,
- :sinter,
- :sismember,
- :smembers,
- :srandmember,
- :strlen,
- :sunion,
- :type,
- :zcard,
- :zcount,
- :zrange,
- :zrangebyscore,
- :zrank,
- :zrevrange,
- :zrevrangebyscore,
- :zrevrank,
- :zscore
- ].freeze
-
- # Unsupported Redis operations. These don't make sense in a client
- # that abstracts the master/slave servers.
- UNSUPPORTED_OPS = Set[:select, :dbsize].freeze
-
# Performance optimization: to avoid unnecessary method_missing calls,
# we proactively define methods that dispatch to the underlying redis
# calls.
Redis.public_instance_methods(false).each do |method|
define_method(method) do |*args, &block|
@@ -91,29 +45,41 @@
# @option options [String] :db database to use for redis nodes
# @option options [String] :namespace namespace for redis nodes
# @option options [Logger] :logger logger override
# @option options [Boolean] :retry_failure indicates if failures are retried
# @option options [Integer] :max_retries max retries for a failure
+ # @option options [Boolean] :safe_mode indicates if safe mode is used or not
+ # @option options [Boolean] :master_only indicates if only redis master is used
# @return [RedisFailover::Client]
def initialize(options = {})
Util.logger = options[:logger] if options[:logger]
- @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'}
- @znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH
- @namespace = options[:namespace]
- @password = options[:password]
- @db = options[:db]
- @retry = options[:retry_failure] || true
- @max_retries = @retry ? options.fetch(:max_retries, 3) : 0
@master = nil
@slaves = []
@node_addresses = {}
@lock = Monitor.new
@current_client_key = "current-client-#{self.object_id}"
+ yield self if block_given?
+
+ parse_options(options)
setup_zk
build_clients
end
+ # Specifies a callback to invoke when the current redis node list changes.
+ #
+ # @param [Proc] a callback with current master and slaves as arguments
+ #
+ # @example Usage
+ # RedisFailover::Client.new(:zkservers => zk_servers) do |client|
+ # client.on_node_change do |master, slaves|
+ # logger.info("Nodes changed! master: #{master}, slaves: #{slaves}")
+ # end
+ # end
+ def on_node_change(&callback)
+ @on_node_change = callback
+ end
+
# Dispatches redis operations to master/slaves.
def method_missing(method, *args, &block)
if redis_operation?(method)
dispatch(method, *args, &block)
else
@@ -166,17 +132,35 @@
purge_clients
@zk ? @zk.reopen : setup_zk
build_clients
end
+ # Retrieves the current redis master.
+ #
+ # @return [String] the host/port of the current master
+ def current_master
+ master = @lock.synchronize { @master }
+ address_for(master)
+ end
+
+ # Retrieves the current redis slaves.
+ #
+ # @return [Array<String>] an array of known slave host/port addresses
+ def current_slaves
+ slaves = @lock.synchronize { @slaves }
+ addresses_for(slaves)
+ end
+
private
# Sets up the underlying ZooKeeper connection.
def setup_zk
@zk = ZK.new(@zkservers)
@zk.watcher.register(@znode) { |event| handle_zk_event(event) }
- @zk.on_expired_session { purge_clients }
+ if @safe_mode
+ @zk.on_expired_session { purge_clients }
+ end
@zk.on_connected { @zk.stat(@znode, :watch => true) }
@zk.stat(@znode, :watch => true)
update_znode_timestamp
end
@@ -208,11 +192,11 @@
# @param [Symbol] method the method to dispatch
# @param [Array] args the arguments to pass to the method
# @param [Proc] block an optional block to pass to the method
# @return [Object] the result of dispatching the command
def dispatch(method, *args, &block)
- unless recently_heard_from_node_manager?
+ if @safe_mode && !recently_heard_from_node_manager?
build_clients
end
verify_supported!(method)
tries = 0
@@ -275,14 +259,30 @@
@master = new_master
@slaves = new_slaves
rescue
purge_clients
raise
+ ensure
+ if should_notify?
+ @on_node_change.call(current_master, current_slaves)
+ @last_notified_master = current_master
+ @last_notified_slaves = current_slaves
+ end
end
end
end
+ # Determines if the on_node_change callback should be invoked.
+ #
+ # @return [Boolean] true if callback should be invoked, false otherwise
+ def should_notify?
+ return false unless @on_node_change
+ return true if @last_notified_master != current_master
+ return true if different?(Array(@last_notified_slaves), current_slaves)
+ false
+ end
+
# Fetches the known redis nodes from ZooKeeper.
#
# @return [Hash] the known master/slave redis servers
def fetch_nodes
data = @zk.get(@znode, :watch => true).first
@@ -422,19 +422,43 @@
# This method stores a stack of clients used to handle the case
# where the same RedisFailover::Client instance is referenced by
# nested blocks (e.g., block passed to multi).
def client_for(method)
stack = Thread.current[@current_client_key] ||= []
- client = stack.last || (REDIS_READ_OPS.include?(method) ? slave : master)
+ client = if stack.last
+ stack.last
+ elsif @master_only
+ master
+ elsif REDIS_READ_OPS.include?(method)
+ slave
+ else
+ master
+ end
+
stack << client
client
end
# Pops a client from the thread-local client stack.
def free_client
if stack = Thread.current[@current_client_key]
stack.pop
end
nil
+ end
+
+ # Parses the configuration operations.
+ #
+ # @param [Hash] options the configuration options
+ def parse_options(options)
+ @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'}
+ @znode = options.fetch(:znode_path, Util::DEFAULT_ZNODE_PATH)
+ @namespace = options[:namespace]
+ @password = options[:password]
+ @db = options[:db]
+ @retry = options.fetch(:retry_failure, true)
+ @max_retries = @retry ? options.fetch(:max_retries, 3) : 0
+ @safe_mode = options.fetch(:safe_mode, true)
+ @master_only = options.fetch(:master_only, false)
end
end
end