lib/thread/pool.rb in thread-0.1.4 vs lib/thread/pool.rb in thread-0.1.5
- old
+ new
@@ -59,10 +59,11 @@
@timedout = true
elsif reason.is_a? Asked
return
else
@exception = reason
+ raise @exception if Thread::Pool.abort_on_exception
end
end
@running = false
@finished = true
@@ -117,11 +118,11 @@
@cond = ConditionVariable.new
@mutex = Mutex.new
@done = ConditionVariable.new
@done_mutex = Mutex.new
-
+
@todo = []
@workers = []
@timeouts = {}
@spawned = 0
@@ -198,33 +199,33 @@
@done_mutex.synchronize {
return if done?
@done.wait @done_mutex
}
end
-
- # Check if there are idle workers.
- def idle?
- @todo.length < @waiting
- end
- # Process Block when there is a idle worker if not block its returns
- def idle (&block)
- while !idle?
- @done_mutex.synchronize {
- break if idle?
- @done.wait @done_mutex
- }
- end
+ # Check if there are idle workers.
+ def idle?
+ @todo.length < @waiting
+ end
- unless block
- return
- end
+ # Process Block when there is a idle worker if not block its returns
+ def idle (*args, &block)
+ while !idle?
+ @done_mutex.synchronize {
+ break if idle?
+ @done.wait @done_mutex
+ }
+ end
- process &block
+ unless block
+ return
+ end
- end
-
+ process *args, &block
+
+ end
+
# Add a task to the pool which will execute the block with the given
# argument.
#
# If no block is passed the default block will be used if present, an
# ArgumentError will be raised otherwise.
@@ -335,11 +336,18 @@
}
self
end
-private
+ class << self
+ # If true, tasks will allow raised exceptions to pass through.
+ #
+ # Similar to Thread.abort_on_exception
+ attr_accessor :abort_on_exception
+ end
+
+ private
def wake_up_timeout
if defined? @pipes
@pipes.last.write_nonblock 'x' rescue nil
end
end
@@ -396,11 +404,11 @@
@workers << thread
thread
end
-
+
def spawn_timeout_thread
@pipes = IO.pipe
@timeout = Thread.new {
loop do
now = Time.now
@@ -426,11 +434,11 @@
task.timeout!
end
}
@timeouts.reject! { |task, _| task.terminated? || task.finished? }
-
+
break if @shutdown == :now
end
}
end
@@ -444,6 +452,6 @@
class Thread
# Helper to create a pool.
def self.pool (*args, &block)
Thread::Pool.new(*args, &block)
end
-end
\ No newline at end of file
+end