lib/redis_failover/client.rb in redis_failover-0.7.0 vs lib/redis_failover/client.rb in redis_failover-0.8.0
- old
+ new
@@ -85,20 +85,18 @@
end
# Creates a new failover redis client.
#
# Options:
- #
# :zkservers - comma-separated ZooKeeper host:port pairs (required)
# :znode_path - the Znode path override for redis server list (optional)
# :password - password for redis nodes (optional)
# :db - db to use for redis nodes (optional)
# :namespace - namespace for redis nodes (optional)
# :logger - logger override (optional)
# :retry_failure - indicate if failures should be retried (default true)
# :max_retries - max retries for a failure (default 3)
- #
def initialize(options = {})
Util.logger = options[:logger] if options[:logger]
@zkservers = options.fetch(:zkservers) { raise ArgumentError, ':zkservers required'}
@znode = options[:znode_path] || Util::DEFAULT_ZNODE_PATH
@namespace = options[:namespace]
@@ -130,26 +128,40 @@
def inspect
"#<RedisFailover::Client (master: #{master_name}, slaves: #{slave_names})>"
end
alias_method :to_s, :inspect
+ # Force a manual failover to a new server. A specific server can be specified
+ # via options. If no options are passed, a random slave will be selected as
+ # the candidate for the new master.
+ #
+ # Options:
+ # :host - the host of the failover candidate
+ # :port - the port of the failover candidate
+ def manual_failover(options = {})
+ Manual.failover(zk, options)
+ self
+ end
+
private
def zk
@lock.synchronize { @zk }
end
def start_zk
@delivery_thread ||= Thread.new do
while event = @queue.pop
- if event.is_a?(Proc)
- event.call
- else
- handle_zk_event(event)
+ begin
+ Proc === event ? event.call : handle_zk_event(event)
+ rescue => ex
+ logger.error("Error while handling event: #{ex.inspect}")
+ logger.error(ex.backtrace.join("\n"))
end
end
end
+
reconnect_zk
end
def handle_session_established
@lock.synchronize do
@@ -163,10 +175,11 @@
@queue << proc { handle_lost_connection }
end
@zk.on_connected do
@zk.stat(@znode, :watch => true)
end
+ @zk.stat(@znode, :watch => true)
end
end
def handle_zk_event(event)
update_znode_timestamp
@@ -179,17 +192,17 @@
logger.error("Unknown ZK node event: #{event.inspect}")
end
end
def reconnect_zk
- handle_lost_connection
@lock.synchronize do
+ handle_lost_connection
@zk.close! if @zk
@zk = ZK.new(@zkservers)
+ handle_session_established
+ update_znode_timestamp
end
- handle_session_established
- update_znode_timestamp
end
def handle_lost_connection
purge_clients
end
@@ -198,11 +211,13 @@
Redis.public_instance_methods(false).include?(method)
end
def dispatch(method, *args, &block)
unless recently_heard_from_node_manager?
- purge_clients
- raise MissingNodeManagerError.new(ZNODE_UPDATE_TIMEOUT)
+ @lock.synchronize do
+ reconnect_zk
+ build_clients
+ end
end
verify_supported!(method)
tries = 0
begin