lib/promise_pool/promise.rb in promise_pool-0.1.0 vs lib/promise_pool/promise.rb in promise_pool-0.9.0
- old
+ new
@@ -2,12 +2,13 @@
require 'thread'
require 'promise_pool/future'
module PromisePool
class Promise
- def self.claim value
+ def self.claim value, &callback
promise = new
+ promise.then(&callback) if block_given?
promise.fulfill(value)
promise
end
def self.backtrace
@@ -19,13 +20,13 @@
e.set_backtrace((e.backtrace || caller) + backtrace)
end
def initialize timer=nil
self.value = self.error = self.result = nil
- self.resolved = self.called = false
+ self.resolved = false
+ self.callbacks = []
- self.k = []
self.timer = timer
self.condv = ConditionVariable.new
self.mutex = Mutex.new
end
@@ -52,10 +53,11 @@
end
def call
self.thread = Thread.current # set working thread
protected_yield{ yield } # avoid any exception and do the job
+ self
end
def future
Future.new(self)
end
@@ -67,11 +69,16 @@
end
# called in client thread (from the future (e.g. body))
def yield
wait
- mutex.synchronize{ callback }
+ case result
+ when Exception
+ raise result
+ else
+ result
+ end
end
# called in requesting thread after the request is done
def fulfill value
mutex.synchronize{ fulfilling(value) }
@@ -82,37 +89,41 @@
mutex.synchronize{ rejecting(error) }
end
# append your actions, which would be called when we're calling back
def then &action
- k << action
+ callbacks << action
self
end
def resolved?
resolved
end
protected
- attr_accessor :value, :error, :result, :resolved, :called,
- :k, :timer, :condv, :mutex, :task, :thread
+ attr_accessor :value, :error, :result, :resolved, :callbacks,
+ :timer, :condv, :mutex, :task, :thread
private
- def fulfilling value
+ def fulfilling value # should be synchronized
self.value = value
resolve
end
- def rejecting error
+ def rejecting error # should be synchronized
self.error = error
resolve
end
- def resolve
- self.resolved = true
- yield if block_given?
+ def resolve # should be synchronized
+ self.result = callbacks.inject(error || value){ |r, k| k.call(r) }
+ rescue Exception => err
+ self.class.set_backtrace(err)
+ self.result = err
+ log_callback_error(err)
ensure
+ self.resolved = true
condv.broadcast # client or response might be waiting
end
# called in a new thread if pool_size == 0, otherwise from the pool
# i.e. requesting thread
@@ -128,24 +139,16 @@
reject(err)
end
def timeout_protected_yield
# timeout might already be set for thread_pool (pool_size > 0)
- timer.on_timeout{ cancel_task } unless timer
+ timer.on_timeout{ cancel_task } unless timer.timer
yield
ensure
timer.cancel
end
- # called in client thread, when yield is called
- def callback
- return result if called
- self.result = k.inject(error || value){ |r, i| i.call(r) }
- ensure
- self.called = true
- end
-
# timeout!
def cancel_task
mutex.synchronize do
if resolved?
# do nothing if it's already done
@@ -156,8 +159,15 @@
# fulfill the promise with Timeout::Error
task.cancel
rejecting(timer.error)
end
end
+ end
+
+ # log user callback error, should never raise
+ def log_callback_error err
+ warn "#{self.class}: ERROR: #{err}\n from #{err.backtrace.inspect}"
+ rescue Exception => e
+ Thread.main.raise(e) if !!$DEBUG
end
end
end