require 'concurrent/dereferenceable' require 'concurrent/atomic/condition' require 'concurrent/atomic/event' module Concurrent # An `MVar` is a single-element container that blocks on `get` if it is empty, # and blocks on `put` if it is full. It is safe to use an `MVar` from # multiple threads. `MVar` can be seen as a single-element blocking queue, or # a rendezvous variable. # # An `MVar` is typically used to transfer objects between threads, where the # sending thread will block if the previous message hasn't been taken yet by the # receiving thread. It can also be used to control access to some global shared # state, where threads `take` the value, perform some operation, and then # `put` it back. class MVar include Dereferenceable # Unique value that represents that an `MVar` was empty EMPTY = Object.new # Unique value that represents that an `MVar` timed out before it was able # to produce a value. TIMEOUT = Object.new # Create a new `MVar`, either empty or with an initial value. # # @param [Hash] opts the options controlling how the future will be processed # @option opts [Boolean] :operation (false) when `true` will execute the future on the global # operation pool (for long-running operations), when `false` will execute the future on the # global task pool (for short-running tasks) # @option opts [object] :executor when provided will run all operations on # this executor rather than the global thread pool (overrides :operation) # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and # returning the value returned from the proc def initialize(value = EMPTY, opts = {}) @value = value @mutex = Mutex.new @empty_condition = Condition.new @full_condition = Condition.new set_deref_options(opts) end # Remove the value from an `MVar`, leaving it empty, and blocking if there # isn't a value. A timeout can be set to limit the time spent blocked, in # which case it returns `TIMEOUT` if the time is exceeded. # @return [Object] the value that was taken, or `TIMEOUT` def take(timeout = nil) @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? value = @value @value = EMPTY @empty_condition.signal apply_deref_options(value) else TIMEOUT end end end # Put a value into an `MVar`, blocking if there is already a value until # it is empty. A timeout can be set to limit the time spent blocked, in # which case it returns `TIMEOUT` if the time is exceeded. # @return [Object] the value that was put, or `TIMEOUT` def put(value, timeout = nil) @mutex.synchronize do wait_for_empty(timeout) # If we timed out we won't be empty if unlocked_empty? @value = value @full_condition.signal apply_deref_options(value) else TIMEOUT end end end # Atomically `take`, yield the value to a block for transformation, and then # `put` the transformed value. Returns the transformed value. A timeout can # be set to limit the time spent blocked, in which case it returns `TIMEOUT` # if the time is exceeded. # @return [Object] the transformed value, or `TIMEOUT` def modify(timeout = nil) raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do wait_for_full(timeout) # If we timed out we'll still be empty if unlocked_full? value = @value @value = yield value @full_condition.signal apply_deref_options(value) else TIMEOUT end end end # Non-blocking version of `take`, that returns `EMPTY` instead of blocking. def try_take! @mutex.synchronize do if unlocked_full? value = @value @value = EMPTY @empty_condition.signal apply_deref_options(value) else EMPTY end end end # Non-blocking version of `put`, that returns whether or not it was successful. def try_put!(value) @mutex.synchronize do if unlocked_empty? @value = value @full_condition.signal true else false end end end # Non-blocking version of `put` that will overwrite an existing value. def set!(value) @mutex.synchronize do old_value = @value @value = value @full_condition.signal apply_deref_options(old_value) end end # Non-blocking version of `modify` that will yield with `EMPTY` if there is no value yet. def modify! raise ArgumentError.new('no block given') unless block_given? @mutex.synchronize do value = @value @value = yield value if unlocked_empty? @empty_condition.signal else @full_condition.signal end apply_deref_options(value) end end # Returns if the `MVar` is currently empty. def empty? @mutex.synchronize { @value == EMPTY } end # Returns if the `MVar` currently contains a value. def full? not empty? end private def unlocked_empty? @value == EMPTY end def unlocked_full? ! unlocked_empty? end def wait_for_full(timeout) wait_while(@full_condition, timeout) { unlocked_empty? } end def wait_for_empty(timeout) wait_while(@empty_condition, timeout) { unlocked_full? } end def wait_while(condition, timeout) remaining = Condition::Result.new(timeout) while yield && remaining.can_wait? remaining = condition.wait(@mutex, remaining.remaining_time) end end end end