lib/concurrent/promise.rb in concurrent-ruby-0.2.0 vs lib/concurrent/promise.rb in concurrent-ruby-0.2.1
- old
+ new
@@ -1,174 +1,174 @@
-require 'thread'
-
-require 'concurrent/global_thread_pool'
-require 'concurrent/obligation'
-require 'concurrent/utilities'
-
-module Concurrent
-
- class Promise
- include Obligation
- include UsesGlobalThreadPool
-
- behavior(:future)
- behavior(:promise)
-
- # Creates a new promise object. "A promise represents the eventual
- # value returned from the single completion of an operation."
- # Promises can be chained in a tree structure where each promise
- # has zero or more children. Promises are resolved asynchronously
- # in the order they are added to the tree. Parents are guaranteed
- # to be resolved before their children. The result of each promise
- # is passed to each of its children upon resolution. When
- # a promise is rejected all its children will be summarily rejected.
- # A promise that is neither resolved or rejected is pending.
- #
- # @param args [Array] zero or more arguments for the block
- # @param block [Proc] the block to call when attempting fulfillment
- #
- # @see http://wiki.commonjs.org/wiki/Promises/A
- # @see http://promises-aplus.github.io/promises-spec/
- def initialize(*args, &block)
- if args.first.is_a?(Promise)
- @parent = args.first
- else
- @parent = nil
- @chain = [self]
- end
-
- @lock = Mutex.new
- @handler = block || Proc.new{|result| result }
- @state = :pending
- @value = nil
- @reason = nil
- @rescued = false
- @children = []
- @rescuers = []
-
- realize(*args) if root?
- end
-
- def rescued?
- return @rescued
- end
-
- # Create a new child Promise. The block argument for the child will
- # be the result of fulfilling its parent. If the child will
- # immediately be rejected if the parent has already been rejected.
- #
- # @param block [Proc] the block to call when attempting fulfillment
- #
- # @return [Promise] the new promise
- def then(&block)
- child = @lock.synchronize do
- block = Proc.new{|result| result } unless block_given?
- @children << Promise.new(self, &block)
- @children.last.on_reject(@reason) if rejected?
- push(@children.last)
- @children.last
- end
- return child
- end
-
- # Add a rescue handler to be run if the promise is rejected (via raised
- # exception). Multiple rescue handlers may be added to a Promise.
- # Rescue blocks will be checked in order and the first one with a
- # matching Exception class will be processed. The block argument
- # will be the exception that caused the rejection.
- #
- # @param clazz [Class] The class of exception to rescue
- # @param block [Proc] the block to call if the rescue is matched
- #
- # @return [self] so that additional chaining can occur
- def rescue(clazz = Exception, &block)
- return self if fulfilled? || rescued? || ! block_given?
- @lock.synchronize do
- rescuer = Rescuer.new(clazz, block)
- if pending?
- @rescuers << rescuer
- else
- try_rescue(reason, rescuer)
- end
- end
- return self
- end
- alias_method :catch, :rescue
- alias_method :on_error, :rescue
-
- protected
-
- attr_reader :parent
- attr_reader :handler
- attr_reader :rescuers
-
- # @private
- Rescuer = Struct.new(:clazz, :block)
-
- # @private
- def root? # :nodoc:
- @parent.nil?
- end
-
- # @private
- def push(promise) # :nodoc:
- if root?
- @chain << promise
- else
- @parent.push(promise)
- end
- end
-
- # @private
- def on_fulfill(value) # :nodoc:
- @lock.synchronize do
- @value = @handler.call(value)
- @state = :fulfilled
- @reason = nil
- end
- return @value
- end
-
- # @private
- def on_reject(reason) # :nodoc:
- @value = nil
- @state = :rejected
- @reason = reason
- try_rescue(reason)
- @children.each{|child| child.on_reject(reason) }
- end
-
- # @private
- def try_rescue(ex, *rescuers) # :nodoc:
- rescuers = @rescuers if rescuers.empty?
- rescuer = rescuers.find{|r| ex.is_a?(r.clazz) }
- if rescuer
- rescuer.block.call(ex)
- @rescued = true
- end
- rescue Exception => e
- # supress
- end
-
- # @private
- def realize(*args) # :nodoc:
- Promise.thread_pool.post(@chain, @lock, args) do |chain, lock, args|
- result = args.length == 1 ? args.first : args
- index = 0
- loop do
- current = lock.synchronize{ chain[index] }
- unless current.rejected?
- current.mutex.synchronize do
- begin
- result = current.on_fulfill(result)
- rescue Exception => ex
- current.on_reject(ex)
- end
- end
- end
- index += 1
- Thread.pass while index >= chain.length
- end
- end
- end
- end
-end
+require 'thread'
+
+require 'concurrent/global_thread_pool'
+require 'concurrent/obligation'
+require 'concurrent/utilities'
+
+module Concurrent
+
+ class Promise
+ include Obligation
+ include UsesGlobalThreadPool
+
+ behavior(:future)
+ behavior(:promise)
+
+ # Creates a new promise object. "A promise represents the eventual
+ # value returned from the single completion of an operation."
+ # Promises can be chained in a tree structure where each promise
+ # has zero or more children. Promises are resolved asynchronously
+ # in the order they are added to the tree. Parents are guaranteed
+ # to be resolved before their children. The result of each promise
+ # is passed to each of its children upon resolution. When
+ # a promise is rejected all its children will be summarily rejected.
+ # A promise that is neither resolved or rejected is pending.
+ #
+ # @param args [Array] zero or more arguments for the block
+ # @param block [Proc] the block to call when attempting fulfillment
+ #
+ # @see http://wiki.commonjs.org/wiki/Promises/A
+ # @see http://promises-aplus.github.io/promises-spec/
+ def initialize(*args, &block)
+ if args.first.is_a?(Promise)
+ @parent = args.first
+ else
+ @parent = nil
+ @chain = [self]
+ end
+
+ @lock = Mutex.new
+ @handler = block || Proc.new{|result| result }
+ @state = :pending
+ @value = nil
+ @reason = nil
+ @rescued = false
+ @children = []
+ @rescuers = []
+
+ realize(*args) if root?
+ end
+
+ def rescued?
+ return @rescued
+ end
+
+ # Create a new child Promise. The block argument for the child will
+ # be the result of fulfilling its parent. If the child will
+ # immediately be rejected if the parent has already been rejected.
+ #
+ # @param block [Proc] the block to call when attempting fulfillment
+ #
+ # @return [Promise] the new promise
+ def then(&block)
+ child = @lock.synchronize do
+ block = Proc.new{|result| result } unless block_given?
+ @children << Promise.new(self, &block)
+ @children.last.on_reject(@reason) if rejected?
+ push(@children.last)
+ @children.last
+ end
+ return child
+ end
+
+ # Add a rescue handler to be run if the promise is rejected (via raised
+ # exception). Multiple rescue handlers may be added to a Promise.
+ # Rescue blocks will be checked in order and the first one with a
+ # matching Exception class will be processed. The block argument
+ # will be the exception that caused the rejection.
+ #
+ # @param clazz [Class] The class of exception to rescue
+ # @param block [Proc] the block to call if the rescue is matched
+ #
+ # @return [self] so that additional chaining can occur
+ def rescue(clazz = Exception, &block)
+ return self if fulfilled? || rescued? || ! block_given?
+ @lock.synchronize do
+ rescuer = Rescuer.new(clazz, block)
+ if pending?
+ @rescuers << rescuer
+ else
+ try_rescue(reason, rescuer)
+ end
+ end
+ return self
+ end
+ alias_method :catch, :rescue
+ alias_method :on_error, :rescue
+
+ protected
+
+ attr_reader :parent
+ attr_reader :handler
+ attr_reader :rescuers
+
+ # @private
+ Rescuer = Struct.new(:clazz, :block)
+
+ # @private
+ def root? # :nodoc:
+ @parent.nil?
+ end
+
+ # @private
+ def push(promise) # :nodoc:
+ if root?
+ @chain << promise
+ else
+ @parent.push(promise)
+ end
+ end
+
+ # @private
+ def on_fulfill(value) # :nodoc:
+ @lock.synchronize do
+ @value = @handler.call(value)
+ @state = :fulfilled
+ @reason = nil
+ end
+ return @value
+ end
+
+ # @private
+ def on_reject(reason) # :nodoc:
+ @value = nil
+ @state = :rejected
+ @reason = reason
+ try_rescue(reason)
+ @children.each{|child| child.on_reject(reason) }
+ end
+
+ # @private
+ def try_rescue(ex, *rescuers) # :nodoc:
+ rescuers = @rescuers if rescuers.empty?
+ rescuer = rescuers.find{|r| ex.is_a?(r.clazz) }
+ if rescuer
+ rescuer.block.call(ex)
+ @rescued = true
+ end
+ rescue Exception => e
+ # supress
+ end
+
+ # @private
+ def realize(*args) # :nodoc:
+ Promise.thread_pool.post(@chain, @lock, args) do |chain, lock, args|
+ result = args.length == 1 ? args.first : args
+ index = 0
+ loop do
+ current = lock.synchronize{ chain[index] }
+ unless current.rejected?
+ current.mutex.synchronize do
+ begin
+ result = current.on_fulfill(result)
+ rescue Exception => ex
+ current.on_reject(ex)
+ end
+ end
+ end
+ index += 1
+ Thread.pass while index >= chain.length
+ end
+ end
+ end
+ end
+end