lib/redis_failover/client.rb in redis_failover-0.6.0 vs lib/redis_failover/client.rb in redis_failover-0.7.0
- old
+ new
@@ -103,15 +103,16 @@
@znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH
@namespace = options[:namespace]
@password = options[:password]
@db = options[:db]
@retry = options[:retry_failure] || true
- @max_retries = @retry ? options.fetch(:max_retries, 3) : 1
+ @max_retries = @retry ? options.fetch(:max_retries, 3) : 0
@master = nil
@slaves = []
+ @queue = Queue.new
@lock = Monitor.new
- setup_zookeeper_client
+ start_zk
build_clients
end
# Dispatches redis operations to master/slaves.
def method_missing(method, *args, &block)
@@ -131,44 +132,70 @@
end
alias_method :to_s, :inspect
private
- def setup_zookeeper_client
- @zkclient = ZkClient.new(@zkservers) do |client|
- # when session expires, purge client list
- client.on_session_expiration do
- purge_clients
- end
+ def zk
+ @lock.synchronize { @zk }
+ end
- # when we are disconnected, purge client list
- client.event_handler.register_state_handler(:connecting) do
- purge_clients
+ def start_zk
+ @delivery_thread ||= Thread.new do
+ while event = @queue.pop
+ if event.is_a?(Proc)
+ event.call
+ else
+ handle_zk_event(event)
+ end
end
+ end
+ reconnect_zk
+ end
- # when session is recovered, watch again
- client.on_session_recovered do
- client.stat(@znode, :watch => true)
+ def handle_session_established
+ @lock.synchronize do
+ @zk.watcher.register(@znode) do |event|
+ @queue << event
end
-
- # register a watcher for future changes
- client.watcher.register(@znode) do |event|
- if event.node_created? || event.node_changed?
- update_znode_timestamp
- build_clients
- elsif event.node_deleted?
- update_znode_timestamp
- purge_clients
- client.stat(@znode, :watch => true)
- else
- logger.error("Unknown ZK node event: #{event.inspect}")
- end
+ @zk.on_expired_session do
+ @queue << proc { reconnect_zk }
end
+ @zk.event_handler.register_state_handler(:connecting) do
+ @queue << proc { handle_lost_connection }
+ end
+ @zk.on_connected do
+ @zk.stat(@znode, :watch => true)
+ end
end
+ end
+
+ def handle_zk_event(event)
update_znode_timestamp
+ if event.node_created? || event.node_changed?
+ build_clients
+ elsif event.node_deleted?
+ purge_clients
+ zk.stat(@znode, :watch => true)
+ else
+ logger.error("Unknown ZK node event: #{event.inspect}")
+ end
end
+ def reconnect_zk
+ handle_lost_connection
+ @lock.synchronize do
+ @zk.close! if @zk
+ @zk = ZK.new(@zkservers)
+ end
+ handle_session_established
+ update_znode_timestamp
+ end
+
+ def handle_lost_connection
+ purge_clients
+ end
+
def redis_operation?(method)
Redis.public_instance_methods(false).include?(method)
end
def dispatch(method, *args, &block)
@@ -195,11 +222,10 @@
tries += 1
build_clients
sleep(RETRY_WAIT_TIME)
retry
end
-
raise
end
end
def master
@@ -219,11 +245,11 @@
master
end
def build_clients
@lock.synchronize do
- tries = 0
+ retried = false
begin
nodes = fetch_nodes
return unless nodes_changed?(nodes)
@@ -231,27 +257,26 @@
logger.info("Building new clients for nodes #{nodes}")
new_master = new_clients_for(nodes[:master]).first if nodes[:master]
new_slaves = new_clients_for(*nodes[:slaves])
@master = new_master
@slaves = new_slaves
- rescue StandardError, *CONNECTIVITY_ERRORS => ex
- purge_clients
- logger.error("Failed to fetch nodes from #{@zkservers} - #{ex.inspect}")
+ rescue ZK::Exceptions::InterruptedSession => ex
+ logger.error("ZK error while attempting to build clients: #{ex.inspect}")
logger.error(ex.backtrace.join("\n"))
- if tries < @max_retries
- tries += 1
- sleep(RETRY_WAIT_TIME)
+ # when ZK is disconnected, retry once
+ unless retried
+ reconnect_zk
+ retried = true
retry
end
-
raise
end
end
end
def fetch_nodes
- data = @zkclient.get(@znode, :watch => true).first
+ data = zk.get(@znode, :watch => true).first
nodes = symbolize_keys(decode(data))
logger.debug("Fetched nodes: #{nodes}")
nodes
end