lib/redis_failover/client.rb in redis_failover-0.6.0 vs lib/redis_failover/client.rb in redis_failover-0.7.0

- old
+ new

@@ -103,15 +103,16 @@ @znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH @namespace = options[:namespace] @password = options[:password] @db = options[:db] @retry = options[:retry_failure] || true - @max_retries = @retry ? options.fetch(:max_retries, 3) : 1 + @max_retries = @retry ? options.fetch(:max_retries, 3) : 0 @master = nil @slaves = [] + @queue = Queue.new @lock = Monitor.new - setup_zookeeper_client + start_zk build_clients end # Dispatches redis operations to master/slaves. def method_missing(method, *args, &block) @@ -131,44 +132,70 @@ end alias_method :to_s, :inspect private - def setup_zookeeper_client - @zkclient = ZkClient.new(@zkservers) do |client| - # when session expires, purge client list - client.on_session_expiration do - purge_clients - end + def zk + @lock.synchronize { @zk } + end - # when we are disconnected, purge client list - client.event_handler.register_state_handler(:connecting) do - purge_clients + def start_zk + @delivery_thread ||= Thread.new do + while event = @queue.pop + if event.is_a?(Proc) + event.call + else + handle_zk_event(event) + end end + end + reconnect_zk + end - # when session is recovered, watch again - client.on_session_recovered do - client.stat(@znode, :watch => true) + def handle_session_established + @lock.synchronize do + @zk.watcher.register(@znode) do |event| + @queue << event end - - # register a watcher for future changes - client.watcher.register(@znode) do |event| - if event.node_created? || event.node_changed? - update_znode_timestamp - build_clients - elsif event.node_deleted? - update_znode_timestamp - purge_clients - client.stat(@znode, :watch => true) - else - logger.error("Unknown ZK node event: #{event.inspect}") - end + @zk.on_expired_session do + @queue << proc { reconnect_zk } end + @zk.event_handler.register_state_handler(:connecting) do + @queue << proc { handle_lost_connection } + end + @zk.on_connected do + @zk.stat(@znode, :watch => true) + end end + end + + def handle_zk_event(event) update_znode_timestamp + if event.node_created? || event.node_changed? + build_clients + elsif event.node_deleted? + purge_clients + zk.stat(@znode, :watch => true) + else + logger.error("Unknown ZK node event: #{event.inspect}") + end end + def reconnect_zk + handle_lost_connection + @lock.synchronize do + @zk.close! if @zk + @zk = ZK.new(@zkservers) + end + handle_session_established + update_znode_timestamp + end + + def handle_lost_connection + purge_clients + end + def redis_operation?(method) Redis.public_instance_methods(false).include?(method) end def dispatch(method, *args, &block) @@ -195,11 +222,10 @@ tries += 1 build_clients sleep(RETRY_WAIT_TIME) retry end - raise end end def master @@ -219,11 +245,11 @@ master end def build_clients @lock.synchronize do - tries = 0 + retried = false begin nodes = fetch_nodes return unless nodes_changed?(nodes) @@ -231,27 +257,26 @@ logger.info("Building new clients for nodes #{nodes}") new_master = new_clients_for(nodes[:master]).first if nodes[:master] new_slaves = new_clients_for(*nodes[:slaves]) @master = new_master @slaves = new_slaves - rescue StandardError, *CONNECTIVITY_ERRORS => ex - purge_clients - logger.error("Failed to fetch nodes from #{@zkservers} - #{ex.inspect}") + rescue ZK::Exceptions::InterruptedSession => ex + logger.error("ZK error while attempting to build clients: #{ex.inspect}") logger.error(ex.backtrace.join("\n")) - if tries < @max_retries - tries += 1 - sleep(RETRY_WAIT_TIME) + # when ZK is disconnected, retry once + unless retried + reconnect_zk + retried = true retry end - raise end end end def fetch_nodes - data = @zkclient.get(@znode, :watch => true).first + data = zk.get(@znode, :watch => true).first nodes = symbolize_keys(decode(data)) logger.debug("Fetched nodes: #{nodes}") nodes end