lib/redis_failover/client.rb in redis_failover-0.8.0 vs lib/redis_failover/client.rb in redis_failover-0.8.1
- old
+ new
@@ -6,12 +6,11 @@
# registers and listens for watcher events from the Node Manager. When these events are received,
# the client fetches the latest set of redis nodes from ZooKeeper and rebuilds its internal
# Redis clients appropriately. RedisFailover::Client also directs write operations to the master,
# and all read operations to the slaves.
#
- # Examples
- #
+ # @example Usage
# client = RedisFailover::Client.new(:zkservers => 'localhost:2181,localhost:2182,localhost:2183')
# client.set('foo', 1) # will be directed to master
# client.get('foo') # will be directed to a slave
#
class Client
@@ -84,19 +83,20 @@
end
end
# Creates a new failover redis client.
#
- # Options:
- # :zkservers - comma-separated ZooKeeper host:port pairs (required)
- # :znode_path - the Znode path override for redis server list (optional)
- # :password - password for redis nodes (optional)
- # :db - db to use for redis nodes (optional)
- # :namespace - namespace for redis nodes (optional)
- # :logger - logger override (optional)
- # :retry_failure - indicate if failures should be retried (default true)
- # :max_retries - max retries for a failure (default 3)
+ # @param [Hash] options the options used to initialize the client instance
+ # @option options [String] :zkservers comma-separated ZooKeeper host:port pairs (required)
+ # @option options [String] :znode_path znode path override for redis server list
+ # @option options [String] :password password for redis nodes
+ # @option options [String] :db database to use for redis nodes
+ # @option options [String] :namespace namespace for redis nodes
+ # @option options [Logger] :logger logger override
+ # @option options [Boolean] :retry_failure indicates if failures should be retried
+ # @option options [Integer] :max_retries max retries for a failure
+ # @return [RedisFailover::Client]
def initialize(options = {})
Util.logger = options[:logger] if options[:logger]
@zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'}
@znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH
@namespace = options[:namespace]
@@ -106,11 +106,11 @@
@max_retries = @retry ? options.fetch(:max_retries, 3) : 0
@master = nil
@slaves = []
@queue = Queue.new
@lock = Monitor.new
- start_zk
+ setup_zk
build_clients
end
# Dispatches redis operations to master/slaves.
def method_missing(method, *args, &block)
@@ -119,100 +119,76 @@
else
super
end
end
- def respond_to?(method)
- redis_operation?(method) || super
+ # Determines whether or not an unknown method can be handled.
+ #
+ # @return [Boolean] indicates if the method can be handled
+ def respond_to_missing?(method)
+ redis_operation?(method)
end
+ # @return [String] a string representation of the client
def inspect
"#<RedisFailover::Client (master: #{master_name}, slaves: #{slave_names})>"
end
alias_method :to_s, :inspect
# Force a manual failover to a new server. A specific server can be specified
# via options. If no options are passed, a random slave will be selected as
# the candidate for the new master.
#
- # Options:
- # :host - the host of the failover candidate
- # :port - the port of the failover candidate
+ # @param [Hash] options the options used for manual failover
+ # @option options [String] :host the host of the failover candidate
+ # @option options [String] :port the port of the failover candidate
def manual_failover(options = {})
Manual.failover(zk, options)
self
end
private
- def zk
- @lock.synchronize { @zk }
+ # Sets up the underlying ZooKeeper connection.
+ def setup_zk
+ @zk = ZK.new(@zkservers)
+ @zk.watcher.register(@znode) { |event| handle_zk_event(event) }
+ @zk.on_expired_session { purge_clients }
+ @zk.on_connected { @zk.stat(@znode, :watch => true) }
+ @zk.stat(@znode, :watch => true)
+ update_znode_timestamp
end
- def start_zk
- @delivery_thread ||= Thread.new do
- while event = @queue.pop
- begin
- Proc === event ? event.call : handle_zk_event(event)
- rescue => ex
- logger.error("Error while handling event: #{ex.inspect}")
- logger.error(ex.backtrace.join("\n"))
- end
- end
- end
-
- reconnect_zk
- end
-
- def handle_session_established
- @lock.synchronize do
- @zk.watcher.register(@znode) do |event|
- @queue << event
- end
- @zk.on_expired_session do
- @queue << proc { reconnect_zk }
- end
- @zk.event_handler.register_state_handler(:connecting) do
- @queue << proc { handle_lost_connection }
- end
- @zk.on_connected do
- @zk.stat(@znode, :watch => true)
- end
- @zk.stat(@znode, :watch => true)
- end
- end
-
+ # Handles a ZK event.
+ #
+ # @param [ZK::Event] event the ZK event to handle
def handle_zk_event(event)
update_znode_timestamp
if event.node_created? || event.node_changed?
build_clients
elsif event.node_deleted?
purge_clients
- zk.stat(@znode, :watch => true)
+ @zk.stat(@znode, :watch => true)
else
logger.error("Unknown ZK node event: #{event.inspect}")
end
end
- def reconnect_zk
- @lock.synchronize do
- handle_lost_connection
- @zk.close! if @zk
- @zk = ZK.new(@zkservers)
- handle_session_established
- update_znode_timestamp
- end
- end
-
- def handle_lost_connection
- purge_clients
- end
-
+ # Determines if a method is a known redis operation.
+ #
+ # @param [Symbol] method the method to check
+ # @return [Boolean] true if redis operation, false otherwise
def redis_operation?(method)
Redis.public_instance_methods(false).include?(method)
end
+ # Dispatches a redis operation to a master or slave.
+ #
+ # @param [Symbol] method the method to dispatch
+ # @param [Array] args the arguments to pass to the method
+ # @param [Proc] block an optional block to pass to the method
+ # @return [Object] the result of dispatching the command
def dispatch(method, *args, &block)
unless recently_heard_from_node_manager?
@lock.synchronize do
reconnect_zk
build_clients
@@ -241,27 +217,38 @@
end
raise
end
end
+ # Returns the currently known master.
+ #
+ # @return [Redis] the Redis client for the current master
+ # @raise [NoMasterError] if no master is available
def master
if master = @lock.synchronize { @master }
verify_role!(master, :master)
return master
end
raise NoMasterError
end
+ # Returns a random slave from the list of known slaves.
+ #
+ # @note If there are no slaves, the master is returned.
+ # @return [Redis] the Redis client for the slave or master
+ # @raise [NoMasterError] if no master fallback is available
def slave
# pick a slave, if none available fallback to master
if slave = @lock.synchronize { @slaves.sample }
verify_role!(slave, :slave)
return slave
end
master
end
+ # Builds the Redis clients for the currently known master/slaves.
+ # The current master/slaves are fetched via ZooKeeper.
def build_clients
@lock.synchronize do
retried = false
begin
@@ -287,18 +274,25 @@
raise
end
end
end
+ # Fetches the known redis nodes from ZooKeeper.
+ #
+ # @return [Hash] the known master/slave redis servers
def fetch_nodes
- data = zk.get(@znode, :watch => true).first
+ data = @zk.get(@znode, :watch => true).first
nodes = symbolize_keys(decode(data))
logger.debug("Fetched nodes: #{nodes}")
nodes
end
+ # Builds new Redis clients for the specified nodes.
+ #
+ # @param [Array<String>] nodes the array of redis host:port pairs
+ # @return [Array<Redis>] the array of corresponding Redis clients
def new_clients_for(*nodes)
nodes.map do |node|
host, port = node.split(':')
opts = {:host => host, :port => port}
opts.update(:db => @db) if @db
@@ -309,72 +303,102 @@
end
client
end
end
+ # @return [String] a friendly name for current master
def master_name
address_for(@master) || 'none'
end
+ # @return [Array<String>] friendly names for current slaves
def slave_names
return 'none' if @slaves.empty?
addresses_for(@slaves).join(', ')
end
+ # Verifies the actual role for a redis node.
+ #
+ # @param [Redis] node the redis node to check
+ # @param [Symbol] role the role to verify
+ # @return [Symbol] the verified role
+ # @raise [InvalidNodeRoleError] if the role is invalid
def verify_role!(node, role)
current_role = node.info['role']
if current_role.to_sym != role
raise InvalidNodeRoleError.new(address_for(node), role, current_role)
end
role
end
+ # Ensures that the method is supported.
+ #
+ # @raise [UnsupportedOperationError] if the operation isn't supported
def verify_supported!(method)
if UNSUPPORTED_OPS.include?(method)
raise UnsupportedOperationError.new(method)
end
end
+ # Returns node addresses.
+ #
+ # @param [Array<Redis>] nodes the redis clients
+ # @return [Array<String>] the addresses for the nodes
def addresses_for(nodes)
nodes.map { |node| address_for(node) }
end
+ # Returns a node address.
+ #
+ # @param [Redis] node a redis client
+ # @return [String] the address for the node
def address_for(node)
return unless node
"#{node.client.host}:#{node.client.port}"
end
+ # Determines if the currently known redis servers is different
+ # from the nodes returned by ZooKeeper.
+ #
+ # @param [Array<String>] new_nodes the new redis nodes
+ # @return [Boolean] true if nodes are different, false otherwise
def nodes_changed?(new_nodes)
return true if address_for(@master) != new_nodes[:master]
return true if different?(addresses_for(@slaves), new_nodes[:slaves])
false
end
- def disconnect(*connections)
- connections.each do |conn|
+ # Disconnects one or more redis clients.
+ #
+ # @param [Array<Redis>] redis_clients the redis clients
+ def disconnect(*redis_clients)
+ redis_clients.each do |conn|
if conn
begin
conn.client.disconnect
rescue
# best effort
end
end
end
end
+ # Disconnects current redis clients and resets this client's view of the world.
def purge_clients
@lock.synchronize do
logger.info("Purging current redis clients")
disconnect(@master, *@slaves)
@master = nil
@slaves = []
end
end
+ # Updates timestamp when an event is received by the Node Manager.
def update_znode_timestamp
@last_znode_timestamp = Time.now
end
+ # @return [Boolean] indicates if we recently heard from the Node Manager
def recently_heard_from_node_manager?
return false unless @last_znode_timestamp
Time.now - @last_znode_timestamp <= ZNODE_UPDATE_TIMEOUT
end
end