lib/redis_failover/client.rb in redis_failover-0.7.0 vs lib/redis_failover/client.rb in redis_failover-0.8.0

- old
+ new

@@ -85,20 +85,18 @@ end # Creates a new failover redis client. # # Options: - # # :zkservers - comma-separated ZooKeeper host:port pairs (required) # :znode_path - the Znode path override for redis server list (optional) # :password - password for redis nodes (optional) # :db - db to use for redis nodes (optional) # :namespace - namespace for redis nodes (optional) # :logger - logger override (optional) # :retry_failure - indicate if failures should be retried (default true) # :max_retries - max retries for a failure (default 3) - # def initialize(options = {}) Util.logger = options[:logger] if options[:logger] @zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'} @znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH @namespace = options[:namespace] @@ -130,26 +128,40 @@ def inspect "#<RedisFailover::Client (master: #{master_name}, slaves: #{slave_names})>" end alias_method :to_s, :inspect + # Force a manual failover to a new server. A specific server can be specified + # via options. If no options are passed, a random slave will be selected as + # the candidate for the new master. + # + # Options: + # :host - the host of the failover candidate + # :port - the port of the failover candidate + def manual_failover(options = {}) + Manual.failover(zk, options) + self + end + private def zk @lock.synchronize { @zk } end def start_zk @delivery_thread ||= Thread.new do while event = @queue.pop - if event.is_a?(Proc) - event.call - else - handle_zk_event(event) + begin + Proc === event ? event.call : handle_zk_event(event) + rescue => ex + logger.error("Error while handling event: #{ex.inspect}") + logger.error(ex.backtrace.join("\n")) end end end + reconnect_zk end def handle_session_established @lock.synchronize do @@ -163,10 +175,11 @@ @queue << proc { handle_lost_connection } end @zk.on_connected do @zk.stat(@znode, :watch => true) end + @zk.stat(@znode, :watch => true) end end def handle_zk_event(event) update_znode_timestamp @@ -179,17 +192,17 @@ logger.error("Unknown ZK node event: #{event.inspect}") end end def reconnect_zk - handle_lost_connection @lock.synchronize do + handle_lost_connection @zk.close! if @zk @zk = ZK.new(@zkservers) + handle_session_established + update_znode_timestamp end - handle_session_established - update_znode_timestamp end def handle_lost_connection purge_clients end @@ -198,11 +211,13 @@ Redis.public_instance_methods(false).include?(method) end def dispatch(method, *args, &block) unless recently_heard_from_node_manager? - purge_clients - raise MissingNodeManagerError.new(ZNODE_UPDATE_TIMEOUT) + @lock.synchronize do + reconnect_zk + build_clients + end end verify_supported!(method) tries = 0 begin