lib/zk/locker/locker_base.rb in zk-1.8.0 vs lib/zk/locker/locker_base.rb in zk-1.9.0
- old
+ new
@@ -100,10 +100,15 @@
# @return [String] last path component of our lock path
def lock_basename
synchronize { lock_path and File.basename(lock_path) }
end
+ # @private
+ def lock_number
+ synchronize { lock_path and digit_from(lock_path) }
+ end
+
# returns our current idea of whether or not we hold the lock, which does
# not actually check the state on the server.
#
# The reason for the equivocation around _thinking_ we hold the lock is
# to contrast our current state and the actual state on the server. If you
@@ -286,14 +291,10 @@
rescue LockAssertionFailedError
false
end
private
- def lock_with_opts_hash(opts={})
- raise NotImplementedError
- end
-
def synchronize
@mutex.synchronize { yield }
end
def digit_from(path)
@@ -316,17 +317,10 @@
zk.mkdir_p(@root_lock_path)
rescue NoNode
retry
end
- # performs the checks that (according to the recipe) mean that we hold
- # the lock. used by (#assert!)
- #
- def got_lock?
- raise NotImplementedError
- end
-
# prefix is the string that will appear in front of the sequence num,
# defaults to 'lock'
#
# this method also saves the stat of root_lock_path at the time of creation
# to ensure we don't accidentally remove a lock we don't own. see
@@ -392,9 +386,84 @@
@lock_path = @parent_stat = nil
end
rval
+ end
+
+ # @private
+ def lower_lock_names(watch=false)
+ olc = ordered_lock_children(watch)
+ return olc unless lock_path
+
+ olc.select do |lock|
+ digit_from(lock) < lock_number
+ end
+ end
+
+ # for write locks & semaphores, this will be all locks lower than us
+ # for read locks, this will be all write-locks lower than us.
+ # @return [Array] an array of string node paths
+ def blocking_locks
+ raise NotImplementedError
+ end
+
+ def lock_prefix
+ raise NotImplementedError
+ end
+
+ # performs the checks that (according to the recipe) mean that we hold
+ # the lock. used by (#assert!)
+ #
+ def got_lock?
+ lock_path and blocking_locks.empty?
+ end
+
+ # for write locks & read locks, this will be zero since #blocking_locks
+ # accounts for all locks that could block at all.
+ # for semaphores, this is one less than the semaphore size.
+ # @private
+ # @returns [Integer]
+ def allowed_blocking_locks_remaining
+ 0
+ end
+
+ def blocking_locks_full_paths
+ blocking_locks.map { |partial| "#{root_lock_path}/#{partial}"}
+ end
+
+ def lock_with_opts_hash(opts)
+ create_lock_path!(lock_prefix)
+
+ lock_opts = LockOptions.new(opts)
+
+ if got_lock? or (lock_opts.blocking? and block_until_lock!(:timeout => lock_opts.timeout))
+ @mutex.synchronize { @locked = true }
+ else
+ false
+ end
+ ensure
+ cleanup_lock_path! unless @mutex.synchronize { @locked }
+ end
+
+ def block_until_lock!(opts={})
+ paths = blocking_locks_full_paths
+
+ logger.debug { "#{self.class}\##{__method__} paths=#{paths.inspect}" }
+
+ @mutex.synchronize do
+ logger.debug { "assigning the @node_deletion_watcher" }
+ ndw_options = {:threshold => allowed_blocking_locks_remaining}
+ @node_deletion_watcher = NodeDeletionWatcher.new(zk, paths, ndw_options)
+ logger.debug { "broadcasting" }
+ @cond.broadcast
+ end
+
+ logger.debug { "calling block_until_deleted" }
+ Thread.pass
+
+ @node_deletion_watcher.block_until_deleted(opts)
+ true
end
end # LockerBase
end # Locker
end # ZK