require 'thread' require 'concurrent/atomic/atomic_reference' require 'concurrent/errors' require 'concurrent/synchronization' require 'concurrent/atomic/thread_local_var' module Concurrent # Re-entrant read-write lock implementation # # Allows any number of concurrent readers, but only one concurrent writer # (And while the "write" lock is taken, no read locks can be obtained either. # Hence, the write lock can also be called an "exclusive" lock.) # # If another thread has taken a read lock, any thread which wants a write lock # will block until all the readers release their locks. However, once a thread # starts waiting to obtain a write lock, any additional readers that come along # will also wait (so writers are not starved). # # A thread can acquire both a read and write lock at the same time. A thread can # also acquire a read lock OR a write lock more than once. Only when the read (or # write) lock is released as many times as it was acquired, will the thread # actually let it go, allowing other threads which might have been waiting # to proceed. # # If both read and write locks are acquired by the same thread, it is not strictly # necessary to release them in the same order they were acquired. In other words, # the following code is legal: # # @example # lock = Concurrent::ReentrantReadWriteLock.new # lock.acquire_write_lock # lock.acquire_read_lock # lock.release_write_lock # # At this point, the current thread is holding only a read lock, not a write # # lock. So other threads can take read locks, but not a write lock. # lock.release_read_lock # # Now the current thread is not holding either a read or write lock, so # # another thread could potentially acquire a write lock. # # This implementation was inspired by `java.util.concurrent.ReentrantReadWriteLock`. # # @example # lock = Concurrent::ReentrantReadWriteLock.new # lock.with_read_lock { data.retrieve } # lock.with_write_lock { data.modify! } # # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html java.util.concurrent.ReentrantReadWriteLock class ReentrantReadWriteLock < Synchronization::Object # Implementation notes: # # A goal is to make the uncontended path for both readers/writers mutex-free # Only if there is reader-writer or writer-writer contention, should mutexes be used # Otherwise, a single CAS operation is all we need to acquire/release a lock # # Internal state is represented by a single integer ("counter"), and updated # using atomic compare-and-swap operations # When the counter is 0, the lock is free # Each thread which has one OR MORE read locks increments the counter by 1 # (and decrements by 1 when releasing the read lock) # The counter is increased by (1 << 15) for each writer waiting to acquire the # write lock, and by (1 << 29) if the write lock is taken # # Additionally, each thread uses a thread-local variable to count how many times # it has acquired a read lock, AND how many times it has acquired a write lock. # It uses a similar trick; an increment of 1 means a read lock was taken, and # an increment of (1 << 15) means a write lock was taken # This is what makes re-entrancy possible # # 2 rules are followed to ensure good liveness properties: # 1) Once a writer has queued up and is waiting for a write lock, no other thread # can take a lock without waiting # 2) When a write lock is released, readers are given the "first chance" to wake # up and acquire a read lock # Following these rules means readers and writers tend to "take turns", so neither # can starve the other, even under heavy contention # @!visibility private READER_BITS = 15 # @!visibility private WRITER_BITS = 14 # Used with @Counter: # @!visibility private WAITING_WRITER = 1 << READER_BITS # @!visibility private RUNNING_WRITER = 1 << (READER_BITS + WRITER_BITS) # @!visibility private MAX_READERS = WAITING_WRITER - 1 # @!visibility private MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 # Used with @HeldCount: # @!visibility private WRITE_LOCK_HELD = 1 << READER_BITS # @!visibility private READ_LOCK_MASK = WRITE_LOCK_HELD - 1 # @!visibility private WRITE_LOCK_MASK = MAX_WRITERS safe_initialization! # Create a new `ReentrantReadWriteLock` in the unlocked state. def initialize super() @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadQueue = Synchronization::Lock.new # used to queue waiting readers @WriteQueue = Synchronization::Lock.new # used to queue waiting writers @HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread end # Execute a block operation within a read lock. # # @yield the task to be performed within the lock. # # @return [Object] the result of the block operation. # # @raise [ArgumentError] when no block is given. # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def with_read_lock raise ArgumentError.new('no block given') unless block_given? acquire_read_lock begin yield ensure release_read_lock end end # Execute a block operation within a write lock. # # @yield the task to be performed within the lock. # # @return [Object] the result of the block operation. # # @raise [ArgumentError] when no block is given. # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def with_write_lock raise ArgumentError.new('no block given') unless block_given? acquire_write_lock begin yield ensure release_write_lock end end # Acquire a read lock. If a write lock is held by another thread, will block # until it is released. # # @return [Boolean] true if the lock is successfully acquired # # @raise [Concurrent::ResourceLimitError] if the maximum number of readers # is exceeded. def acquire_read_lock if (held = @HeldCount.value) > 0 # If we already have a lock, there's no need to wait if held & READ_LOCK_MASK == 0 # But we do need to update the counter, if we were holding a write # lock but not a read lock @Counter.update { |c| c + 1 } end @HeldCount.value = held + 1 return true end while true c = @Counter.value raise ResourceLimitError.new('Too many reader threads') if max_readers?(c) # If a writer is waiting OR running when we first queue up, we need to wait if waiting_or_running_writer?(c) # Before going to sleep, check again with the ReadQueue mutex held @ReadQueue.synchronize do @ReadQueue.ns_wait if waiting_or_running_writer? end # Note: the above 'synchronize' block could have used #wait_until, # but that waits repeatedly in a loop, checking the wait condition # each time it wakes up (to protect against spurious wakeups) # But we are already in a loop, which is only broken when we successfully # acquire the lock! So we don't care about spurious wakeups, and would # rather not pay the extra overhead of using #wait_until # After a reader has waited once, they are allowed to "barge" ahead of waiting writers # But if a writer is *running*, the reader still needs to wait (naturally) while true c = @Counter.value if running_writer?(c) @ReadQueue.synchronize do @ReadQueue.ns_wait if running_writer? end elsif @Counter.compare_and_set(c, c+1) @HeldCount.value = held + 1 return true end end elsif @Counter.compare_and_set(c, c+1) @HeldCount.value = held + 1 return true end end end # Try to acquire a read lock and return true if we succeed. If it cannot be # acquired immediately, return false. # # @return [Boolean] true if the lock is successfully acquired def try_read_lock if (held = @HeldCount.value) > 0 if held & READ_LOCK_MASK == 0 # If we hold a write lock, but not a read lock... @Counter.update { |c| c + 1 } end @HeldCount.value = held + 1 return true else c = @Counter.value if !waiting_or_running_writer?(c) && @Counter.compare_and_set(c, c+1) @HeldCount.value = held + 1 return true end end false end # Release a previously acquired read lock. # # @return [Boolean] true if the lock is successfully released def release_read_lock held = @HeldCount.value = @HeldCount.value - 1 rlocks_held = held & READ_LOCK_MASK if rlocks_held == 0 c = @Counter.update { |counter| counter - 1 } # If one or more writers were waiting, and we were the last reader, wake a writer up if waiting_or_running_writer?(c) && running_readers(c) == 0 @WriteQueue.signal end elsif rlocks_held == READ_LOCK_MASK raise IllegalOperationError, "Cannot release a read lock which is not held" end true end # Acquire a write lock. Will block and wait for all active readers and writers. # # @return [Boolean] true if the lock is successfully acquired # # @raise [Concurrent::ResourceLimitError] if the maximum number of writers # is exceeded. def acquire_write_lock if (held = @HeldCount.value) >= WRITE_LOCK_HELD # if we already have a write (exclusive) lock, there's no need to wait @HeldCount.value = held + WRITE_LOCK_HELD return true end while true c = @Counter.value raise ResourceLimitError.new('Too many writer threads') if max_writers?(c) # To go ahead and take the lock without waiting, there must be no writer # running right now, AND no writers who came before us still waiting to # acquire the lock # Additionally, if any read locks have been taken, we must hold all of them if c == held # If we successfully swap the RUNNING_WRITER bit on, then we can go ahead if @Counter.compare_and_set(c, c+RUNNING_WRITER) @HeldCount.value = held + WRITE_LOCK_HELD return true end elsif @Counter.compare_and_set(c, c+WAITING_WRITER) while true # Now we have successfully incremented, so no more readers will be able to increment # (they will wait instead) # However, readers OR writers could decrement right here @WriteQueue.synchronize do # So we have to do another check inside the synchronized section # If a writer OR another reader is running, then go to sleep c = @Counter.value @WriteQueue.ns_wait if running_writer?(c) || running_readers(c) != held end # Note: if you are thinking of replacing the above 'synchronize' block # with #wait_until, read the comment in #acquire_read_lock first! # We just came out of a wait # If we successfully turn the RUNNING_WRITER bit on with an atomic swap, # then we are OK to stop waiting and go ahead # Otherwise go back and wait again c = @Counter.value if !running_writer?(c) && running_readers(c) == held && @Counter.compare_and_set(c, c+RUNNING_WRITER-WAITING_WRITER) @HeldCount.value = held + WRITE_LOCK_HELD return true end end end end end # Try to acquire a write lock and return true if we succeed. If it cannot be # acquired immediately, return false. # # @return [Boolean] true if the lock is successfully acquired def try_write_lock if (held = @HeldCount.value) >= WRITE_LOCK_HELD @HeldCount.value = held + WRITE_LOCK_HELD return true else c = @Counter.value if !waiting_or_running_writer?(c) && running_readers(c) == held && @Counter.compare_and_set(c, c+RUNNING_WRITER) @HeldCount.value = held + WRITE_LOCK_HELD return true end end false end # Release a previously acquired write lock. # # @return [Boolean] true if the lock is successfully released def release_write_lock held = @HeldCount.value = @HeldCount.value - WRITE_LOCK_HELD wlocks_held = held & WRITE_LOCK_MASK if wlocks_held == 0 c = @Counter.update { |counter| counter - RUNNING_WRITER } @ReadQueue.broadcast @WriteQueue.signal if waiting_writers(c) > 0 elsif wlocks_held == WRITE_LOCK_MASK raise IllegalOperationError, "Cannot release a write lock which is not held" end true end private # @!visibility private def running_readers(c = @Counter.value) c & MAX_READERS end # @!visibility private def running_readers?(c = @Counter.value) (c & MAX_READERS) > 0 end # @!visibility private def running_writer?(c = @Counter.value) c >= RUNNING_WRITER end # @!visibility private def waiting_writers(c = @Counter.value) (c & MAX_WRITERS) >> READER_BITS end # @!visibility private def waiting_or_running_writer?(c = @Counter.value) c >= WAITING_WRITER end # @!visibility private def max_readers?(c = @Counter.value) (c & MAX_READERS) == MAX_READERS end # @!visibility private def max_writers?(c = @Counter.value) (c & MAX_WRITERS) == MAX_WRITERS end end end