Sha256: abfe08e594217c9624aff48e58b00e2b3d37688d6ff1787c112a6879a856c325
Contents?: true
Size: 1.98 KB
Versions: 2
Compression:
Stored size: 1.98 KB
Contents
module RedisFailover # ZkClient is a thin wrapper over the ZK client to gracefully handle reconnects # when a session expires. class ZkClient include Util MAX_RECONNECTS = 3 def initialize(servers) @servers = servers @lock = Mutex.new build_client end def on_session_expiration(&block) @client.on_expired_session { block.call } @on_session_expiration = block end def on_session_recovered(&block) @client.on_connected { block.call } @on_session_recovered = block end def get(*args, &block) perform_with_reconnect { @client.get(*args, &block) } end def set(*args, &block) perform_with_reconnect { @client.set(*args, &block) } end def watcher(*args, &block) perform_with_reconnect { @client.watcher(*args, &block) } end def event_handler(*args, &block) perform_with_reconnect { @client.event_handler(*args, &block) } end def stat(*args, &block) perform_with_reconnect { @client.stat(*args, &block) } end def create(*args, &block) perform_with_reconnect { @client.create(*args, &block) } end def delete(*args, &block) perform_with_reconnect { @client.delete(*args, &block) } end private def perform_with_reconnect tries = 0 begin yield rescue ZookeeperExceptions::ZookeeperException::SessionExpired logger.info("Zookeeper client session expired, rebuilding client.") if tries < MAX_RECONNECTS tries += 1 @on_session_expiration.call if @on_session_expiration build_client @on_session_recovered.call if @on_session_recovered sleep(2) && retry end raise end end def build_client @lock.synchronize do if @client @client.reopen else @client = ZK.new(@servers) end logger.info("Communicating with zookeeper servers #{@servers}") end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
redis_failover-0.5.2 | lib/redis_failover/zk_client.rb |
redis_failover-0.5.1 | lib/redis_failover/zk_client.rb |