lib/zk/locker/locker_base.rb in zk-1.8.0 vs lib/zk/locker/locker_base.rb in zk-1.9.0

- old
+ new

@@ -100,10 +100,15 @@ # @return [String] last path component of our lock path def lock_basename synchronize { lock_path and File.basename(lock_path) } end + # @private + def lock_number + synchronize { lock_path and digit_from(lock_path) } + end + # returns our current idea of whether or not we hold the lock, which does # not actually check the state on the server. # # The reason for the equivocation around _thinking_ we hold the lock is # to contrast our current state and the actual state on the server. If you @@ -286,14 +291,10 @@ rescue LockAssertionFailedError false end private - def lock_with_opts_hash(opts={}) - raise NotImplementedError - end - def synchronize @mutex.synchronize { yield } end def digit_from(path) @@ -316,17 +317,10 @@ zk.mkdir_p(@root_lock_path) rescue NoNode retry end - # performs the checks that (according to the recipe) mean that we hold - # the lock. used by (#assert!) - # - def got_lock? - raise NotImplementedError - end - # prefix is the string that will appear in front of the sequence num, # defaults to 'lock' # # this method also saves the stat of root_lock_path at the time of creation # to ensure we don't accidentally remove a lock we don't own. see @@ -392,9 +386,84 @@ @lock_path = @parent_stat = nil end rval + end + + # @private + def lower_lock_names(watch=false) + olc = ordered_lock_children(watch) + return olc unless lock_path + + olc.select do |lock| + digit_from(lock) < lock_number + end + end + + # for write locks & semaphores, this will be all locks lower than us + # for read locks, this will be all write-locks lower than us. + # @return [Array] an array of string node paths + def blocking_locks + raise NotImplementedError + end + + def lock_prefix + raise NotImplementedError + end + + # performs the checks that (according to the recipe) mean that we hold + # the lock. used by (#assert!) + # + def got_lock? + lock_path and blocking_locks.empty? + end + + # for write locks & read locks, this will be zero since #blocking_locks + # accounts for all locks that could block at all. + # for semaphores, this is one less than the semaphore size. + # @private + # @returns [Integer] + def allowed_blocking_locks_remaining + 0 + end + + def blocking_locks_full_paths + blocking_locks.map { |partial| "#{root_lock_path}/#{partial}"} + end + + def lock_with_opts_hash(opts) + create_lock_path!(lock_prefix) + + lock_opts = LockOptions.new(opts) + + if got_lock? or (lock_opts.blocking? and block_until_lock!(:timeout => lock_opts.timeout)) + @mutex.synchronize { @locked = true } + else + false + end + ensure + cleanup_lock_path! unless @mutex.synchronize { @locked } + end + + def block_until_lock!(opts={}) + paths = blocking_locks_full_paths + + logger.debug { "#{self.class}\##{__method__} paths=#{paths.inspect}" } + + @mutex.synchronize do + logger.debug { "assigning the @node_deletion_watcher" } + ndw_options = {:threshold => allowed_blocking_locks_remaining} + @node_deletion_watcher = NodeDeletionWatcher.new(zk, paths, ndw_options) + logger.debug { "broadcasting" } + @cond.broadcast + end + + logger.debug { "calling block_until_deleted" } + Thread.pass + + @node_deletion_watcher.block_until_deleted(opts) + true end end # LockerBase end # Locker end # ZK