lib/redis_failover/client.rb in redis_failover-0.4.0 vs lib/redis_failover/client.rb in redis_failover-0.5.0
- old
+ new
@@ -1,15 +1,13 @@
require 'set'
-require 'open-uri'
module RedisFailover
# Redis failover-aware client.
class Client
include Util
RETRY_WAIT_TIME = 3
- REDIS_ERRORS = Errno.constants.map { |c| Errno.const_get(c) }.freeze
REDIS_READ_OPS = Set[
:echo,
:exists,
:get,
:getbit,
@@ -65,33 +63,31 @@
# Creates a new failover redis client.
#
# Options:
#
- # :host - redis failover server host (required)
- # :port - redis failover server port (required)
- # :password - optional password for redis nodes
- # :namespace - optional namespace for redis nodes
- # :logger - optional logger override
+ # :zkservers - comma-separated zookeeper host:port pairs (required)
+ # :znode_path - the Znode path override for redis server list (optional)
+ # :password - password for redis nodes (optional)
+ # :namespace - namespace for redis nodes (optional)
+ # :logger - logger override (optional)
# :retry_failure - indicate if failures should be retried (default true)
# :max_retries - max retries for a failure (default 5)
#
def initialize(options = {})
- unless options.values_at(:host, :port).all?
- raise ArgumentError, ':host and :port options required'
- end
-
Util.logger = options[:logger] if options[:logger]
+ @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'}
+ @znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH
@namespace = options[:namespace]
@password = options[:password]
@retry = options[:retry_failure] || true
@max_retries = @retry ? options.fetch(:max_retries, 3) : 0
- @server_url = "http://#{options[:host]}:#{options[:port]}/redis_servers"
@master = nil
@slaves = []
+ @lock = Mutex.new
+ setup_zookeeper_client
build_clients
- start_background_monitor
end
def method_missing(method, *args, &block)
if redis_operation?(method)
dispatch(method, *args, &block)
@@ -109,10 +105,34 @@
end
alias_method :to_s, :inspect
private
+ def setup_zookeeper_client
+ @zkclient = ZkClient.new(@zkservers)
+
+ # when session expires / we are disconnected, purge client list
+ @zkclient.on_session_expiration do
+ @lock.synchronize { purge_clients }
+ end
+ @zkclient.event_handler.register_state_handler(:connecting) do
+ @lock.synchronize { purge_clients }
+ end
+
+ # register a watcher for future changes
+ @zkclient.watcher.register(@znode) do |event|
+ if event.node_created? || event.node_changed?
+ build_clients
+ elsif event.node_deleted?
+ @zkclient.stat( @znode, :watch => true)
+ @lock.synchronize { purge_clients }
+ else
+ logger.error("Unknown ZK node event: #{event.inspect}")
+ end
+ end
+ end
+
def redis_operation?(method)
Redis.public_instance_methods(false).include?(method)
end
def dispatch(method, *args, &block)
@@ -125,75 +145,75 @@
slave.send(method, *args, &block)
else
# direct everything else to master
master.send(method, *args, &block)
end
- rescue Error, *REDIS_ERRORS
- logger.error("No suitable node available for operation `#{method}.`")
- build_clients
-
+ rescue *ALL_ERRORS => ex
+ logger.error("Error while handling operation `#{method}` - #{ex.message}")
if tries < @max_retries
tries += 1
+ build_clients
sleep(RETRY_WAIT_TIME) && retry
end
raise
end
end
def master
- master = @master
+ master = @lock.synchronize { @master }
if master
verify_role!(master, :master)
return master
end
raise NoMasterError
end
def slave
# pick a slave, if none available fallback to master
- if slave = @slaves.sample
+ if slave = @lock.synchronize { @slaves.sample }
verify_role!(slave, :slave)
return slave
end
master
end
def build_clients
- tries = 0
+ @lock.synchronize do
+ tries = 0
- begin
- logger.info('Checking for new redis nodes.')
- nodes = fetch_nodes
- return unless nodes_changed?(nodes)
+ begin
+ nodes = fetch_nodes
+ return unless nodes_changed?(nodes)
- logger.info('Node change detected, rebuilding clients.')
- master = new_clients_for(nodes[:master]).first if nodes[:master]
- slaves = new_clients_for(*nodes[:slaves])
+ purge_clients
+ logger.info("Building new clients for nodes #{nodes}")
+ new_master = new_clients_for(nodes[:master]).first if nodes[:master]
+ new_slaves = new_clients_for(*nodes[:slaves])
+ @master = new_master
+ @slaves = new_slaves
+ rescue *ALL_ERRORS => ex
+ purge_clients
+ logger.error("Failed to fetch nodes from #{@zkservers} - #{ex.message}")
+ logger.error(ex.backtrace.join("\n"))
- # once clients are successfully created, swap the references
- @master = master
- @slaves = slaves
- rescue => ex
- logger.error("Failed to fetch nodes from #{@server_url} - #{ex.message}")
- logger.error(ex.backtrace.join("\n"))
+ if tries < @max_retries
+ tries += 1
+ sleep(RETRY_WAIT_TIME) && retry
+ end
- if tries < @max_retries
- tries += 1
- sleep(RETRY_WAIT_TIME) && retry
+ raise
end
-
- raise FailoverServerUnavailableError.new(@server_url)
end
end
def fetch_nodes
- open(@server_url) do |io|
- nodes = symbolize_keys(MultiJson.decode(io))
- logger.info("Fetched nodes: #{nodes}")
- nodes
- end
+ data = @zkclient.get(@znode, :watch => true).first
+ nodes = symbolize_keys(decode(data))
+ logger.debug("Fetched nodes: #{nodes}")
+
+ nodes
end
def new_clients_for(*nodes)
nodes.map do |node|
host, port = node.split(':')
@@ -241,22 +261,25 @@
return true if address_for(@master) != new_nodes[:master]
return true if different?(addresses_for(@slaves), new_nodes[:slaves])
false
end
- # Spawns a background thread to periodically fetch the latest
- # set of nodes from the redis failover server.
- def start_background_monitor
- Thread.new do
- loop do
- sleep(10)
+ def disconnect(*connections)
+ connections.each do |conn|
+ if conn
begin
- build_clients
- rescue => ex
- logger.error("Failed to poll for new nodes from #{@server_url} - #{ex.message}")
- logger.error(ex.backtrace.join("\n"))
+ conn.client.disconnect
+ rescue
+ # best effort
end
end
end
+ end
+
+ def purge_clients
+ logger.info("Purging current redis clients")
+ disconnect(@master, *@slaves)
+ @master = nil
+ @slaves = []
end
end
end