lib/promise_pool/promise.rb in promise_pool-0.1.0 vs lib/promise_pool/promise.rb in promise_pool-0.9.0

- old
+ new

@@ -2,12 +2,13 @@ require 'thread' require 'promise_pool/future' module PromisePool class Promise - def self.claim value + def self.claim value, &callback promise = new + promise.then(&callback) if block_given? promise.fulfill(value) promise end def self.backtrace @@ -19,13 +20,13 @@ e.set_backtrace((e.backtrace || caller) + backtrace) end def initialize timer=nil self.value = self.error = self.result = nil - self.resolved = self.called = false + self.resolved = false + self.callbacks = [] - self.k = [] self.timer = timer self.condv = ConditionVariable.new self.mutex = Mutex.new end @@ -52,10 +53,11 @@ end def call self.thread = Thread.current # set working thread protected_yield{ yield } # avoid any exception and do the job + self end def future Future.new(self) end @@ -67,11 +69,16 @@ end # called in client thread (from the future (e.g. body)) def yield wait - mutex.synchronize{ callback } + case result + when Exception + raise result + else + result + end end # called in requesting thread after the request is done def fulfill value mutex.synchronize{ fulfilling(value) } @@ -82,37 +89,41 @@ mutex.synchronize{ rejecting(error) } end # append your actions, which would be called when we're calling back def then &action - k << action + callbacks << action self end def resolved? resolved end protected - attr_accessor :value, :error, :result, :resolved, :called, - :k, :timer, :condv, :mutex, :task, :thread + attr_accessor :value, :error, :result, :resolved, :callbacks, + :timer, :condv, :mutex, :task, :thread private - def fulfilling value + def fulfilling value # should be synchronized self.value = value resolve end - def rejecting error + def rejecting error # should be synchronized self.error = error resolve end - def resolve - self.resolved = true - yield if block_given? + def resolve # should be synchronized + self.result = callbacks.inject(error || value){ |r, k| k.call(r) } + rescue Exception => err + self.class.set_backtrace(err) + self.result = err + log_callback_error(err) ensure + self.resolved = true condv.broadcast # client or response might be waiting end # called in a new thread if pool_size == 0, otherwise from the pool # i.e. requesting thread @@ -128,24 +139,16 @@ reject(err) end def timeout_protected_yield # timeout might already be set for thread_pool (pool_size > 0) - timer.on_timeout{ cancel_task } unless timer + timer.on_timeout{ cancel_task } unless timer.timer yield ensure timer.cancel end - # called in client thread, when yield is called - def callback - return result if called - self.result = k.inject(error || value){ |r, i| i.call(r) } - ensure - self.called = true - end - # timeout! def cancel_task mutex.synchronize do if resolved? # do nothing if it's already done @@ -156,8 +159,15 @@ # fulfill the promise with Timeout::Error task.cancel rejecting(timer.error) end end + end + + # log user callback error, should never raise + def log_callback_error err + warn "#{self.class}: ERROR: #{err}\n from #{err.backtrace.inspect}" + rescue Exception => e + Thread.main.raise(e) if !!$DEBUG end end end