require 'thread' require 'concurrent/errors' require 'concurrent/collection/copy_on_write_observer_set' require 'concurrent/concern/obligation' require 'concurrent/concern/observable' require 'concurrent/synchronization' module Concurrent # An `IVar` is like a future that you can assign. As a future is a value that # is being computed that you can wait on, an `IVar` is a value that is waiting # to be assigned, that you can wait on. `IVars` are single assignment and # deterministic. # # Then, express futures as an asynchronous computation that assigns an `IVar`. # The `IVar` becomes the primitive on which [futures](Future) and # [dataflow](Dataflow) are built. # # An `IVar` is a single-element container that is normally created empty, and # can only be set once. The I in `IVar` stands for immutable. Reading an # `IVar` normally blocks until it is set. It is safe to set and read an `IVar` # from different threads. # # If you want to have some parallel task set the value in an `IVar`, you want # a `Future`. If you want to create a graph of parallel tasks all executed # when the values they depend on are ready you want `dataflow`. `IVar` is # generally a low-level primitive. # # ## Examples # # Create, set and get an `IVar` # # ```ruby # ivar = Concurrent::IVar.new # ivar.set 14 # ivar.value #=> 14 # ivar.set 2 # would now be an error # ``` # # ## See Also # # 1. For the theory: Arvind, R. Nikhil, and K. Pingali. # [I-Structures: Data structures for parallel computing](http://dl.acm.org/citation.cfm?id=69562). # In Proceedings of Workshop on Graph Reduction, 1986. # 2. For recent application: # [DataDrivenFuture in Habanero Java from Rice](http://www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html). class IVar < Synchronization::Object include Concern::Obligation include Concern::Observable # @!visibility private NO_VALUE = Object.new # :nodoc: # Create a new `IVar` in the `:pending` state with the (optional) initial value. # # @param [Object] value the initial value # @param [Hash] opts the options to create a message with # @option opts [String] :dup_on_deref (false) call `#dup` before returning # the data # @option opts [String] :freeze_on_deref (false) call `#freeze` before # returning the data # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc def initialize(value = NO_VALUE, opts = {}, &block) if value != NO_VALUE && block_given? raise ArgumentError.new('provide only a value or a block') end super(&nil) synchronize { ns_initialize(value, opts, &block) } end # Add an observer on this object that will receive notification on update. # # Upon completion the `IVar` will notify all observers in a thread-safe way. # The `func` method of the observer will be called with three arguments: the # `Time` at which the `Future` completed the asynchronous operation, the # final `value` (or `nil` on rejection), and the final `reason` (or `nil` on # fulfillment). # # @param [Object] observer the object that will be notified of changes # @param [Symbol] func symbol naming the method to call when this # `Observable` has changes` def add_observer(observer = nil, func = :update, &block) raise ArgumentError.new('cannot provide both an observer and a block') if observer && block direct_notification = false if block observer = block func = :call end synchronize do if event.set? direct_notification = true else observers.add_observer(observer, func) end end observer.send(func, Time.now, self.value, reason) if direct_notification observer end # @!macro [attach] ivar_set_method # Set the `IVar` to a value and wake or notify all threads waiting on it. # # @!macro [attach] ivar_set_parameters_and_exceptions # @param [Object] value the value to store in the `IVar` # @yield A block operation to use for setting the value # @raise [ArgumentError] if both a value and a block are given # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already # been set or otherwise completed # # @return [IVar] self def set(value = NO_VALUE) check_for_block_or_value!(block_given?, value) raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending) begin value = yield if block_given? complete_without_notification(true, value, nil) rescue => ex complete_without_notification(false, nil, ex) end notify_observers(self.value, reason) self end # @!macro [attach] ivar_fail_method # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. # # @param [Object] reason for the failure # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already # been set or otherwise completed # @return [IVar] self def fail(reason = StandardError.new) complete(false, nil, reason) end # Attempt to set the `IVar` with the given value or block. Return a # boolean indicating the success or failure of the set operation. # # @!macro ivar_set_parameters_and_exceptions # # @return [Boolean] true if the value was set else false def try_set(value = NO_VALUE, &block) set(value, &block) true rescue MultipleAssignmentError false end protected # @!visibility private def ns_initialize(value, opts) value = yield if block_given? init_obligation(self) self.observers = Collection::CopyOnWriteObserverSet.new set_deref_options(opts) if value == NO_VALUE @state = :pending else ns_complete_without_notification(true, value, nil) end end # @!visibility private def safe_execute(task, args = []) if compare_and_set_state(:processing, :pending) success, val, reason = SafeTaskExecutor.new(task, rescue_exception: true).execute(*@args) complete(success, val, reason) yield(success, val, reason) if block_given? end end # @!visibility private def complete(success, value, reason) complete_without_notification(success, value, reason) notify_observers(self.value, reason) self end # @!visibility private def complete_without_notification(success, value, reason) synchronize { ns_complete_without_notification(success, value, reason) } self end # @!visibility private def notify_observers(value, reason) observers.notify_and_delete_observers{ [Time.now, value, reason] } end # @!visibility private def ns_complete_without_notification(success, value, reason) raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state set_state(success, value, reason) event.set end # @!visibility private def check_for_block_or_value!(block_given, value) # :nodoc: if (block_given && value != NO_VALUE) || (! block_given && value == NO_VALUE) raise ArgumentError.new('must set with either a value or a block') end end end end