lib/redis_failover/client.rb in redis_failover-0.2.0 vs lib/redis_failover/client.rb in redis_failover-0.3.0
- old
+ new
@@ -84,15 +84,15 @@
@namespace = options[:namespace]
@password = options[:password]
@retry = options[:retry_failure] || true
@max_retries = @retry ? options.fetch(:max_retries, 3) : 0
@server_url = "http://#{options[:host]}:#{options[:port]}/redis_servers"
- @redis_servers = nil
@master = nil
@slaves = []
@lock = Mutex.new
build_clients
+ start_background_monitor
end
def method_missing(method, *args, &block)
if redis_operation?(method)
dispatch(method, *args, &block)
@@ -104,11 +104,11 @@
def respond_to?(method)
redis_operation?(method) || super
end
def inspect
- "#<RedisFailover::Client - master: #{master_info}, slaves: #{slaves_info})>"
+ "#<RedisFailover::Client - master: #{master_name}, slaves: #{slave_names})>"
end
alias_method :to_s, :inspect
private
@@ -139,13 +139,14 @@
raise
end
end
def master
- if @master
- verify_role!(@master, :master)
- return @master
+ master = @master
+ if master
+ verify_role!(master, :master)
+ return master
end
raise NoMasterError
end
def slave
@@ -160,20 +161,23 @@
def build_clients
@lock.synchronize do
tries = 0
begin
- logger.info('Attempting to fetch nodes and build redis clients.')
- servers = fetch_redis_servers
- master = new_clients_for(servers[:master]).first if servers[:master]
- slaves = new_clients_for(*servers[:slaves])
+ logger.info('Checking for new redis nodes.')
+ nodes = fetch_nodes
+ return unless nodes_changed?(nodes)
+ logger.info('Node change detected, rebuilding clients.')
+ master = new_clients_for(nodes[:master]).first if nodes[:master]
+ slaves = new_clients_for(*nodes[:slaves])
+
# once clients are successfully created, swap the references
@master = master
@slaves = slaves
rescue => ex
- logger.error("Failed to fetch servers from #{@server_url} - #{ex.message}")
+ logger.error("Failed to fetch nodes from #{@server_url} - #{ex.message}")
logger.error(ex.backtrace.join("\n"))
if tries < @max_retries
tries += 1
sleep(RETRY_WAIT_TIME) && retry
@@ -182,15 +186,15 @@
raise FailoverServerUnreachableError.new(@server_url)
end
end
end
- def fetch_redis_servers
+ def fetch_nodes
open(@server_url) do |io|
- servers = symbolize_keys(MultiJson.decode(io))
- logger.info("Fetched servers: #{servers}")
- servers
+ nodes = symbolize_keys(MultiJson.decode(io))
+ logger.info("Fetched nodes: #{nodes}")
+ nodes
end
end
def new_clients_for(*nodes)
nodes.map do |node|
@@ -201,28 +205,54 @@
end
client
end
end
- def master_info
- return "none" unless @master
- name_for(@master)
+ def master_name
+ address_for(@master) || 'none'
end
- def slaves_info
- return "none" if @slaves.empty?
- @slaves.map { |slave| name_for(slave) }.join(', ')
+ def slave_names
+ return 'none' if @slaves.empty?
+ addresses_for(@slaves).join(', ')
end
def verify_role!(node, role)
current_role = node.info['role']
if current_role.to_sym != role
- raise InvalidNodeRoleError.new(name_for(node), role, current_role)
+ raise InvalidNodeRoleError.new(address_for(node), role, current_role)
end
role
end
- def name_for(node)
+ def addresses_for(nodes)
+ nodes.map { |node| address_for(node) }
+ end
+
+ def address_for(node)
+ return unless node
"#{node.client.host}:#{node.client.port}"
+ end
+
+ def nodes_changed?(new_nodes)
+ return true if address_for(@master) != new_nodes[:master]
+ return true if different?(addresses_for(@slaves), new_nodes[:slaves])
+ false
+ end
+
+ # Spawns a background thread to periodically fetch the latest
+ # set of nodes from the redis failover server.
+ def start_background_monitor
+ Thread.new do
+ loop do
+ sleep(10)
+ begin
+ build_clients
+ rescue => ex
+ logger.error("Failed to poll for new nodes from #{@server_url} - #{ex.message}")
+ logger.error(ex.backtrace.join("\n"))
+ end
+ end
+ end
end
end
end