lib/thread/pool.rb in thread-0.0.8.1 vs lib/thread/pool.rb in thread-0.1.0
- old
+ new
@@ -115,19 +115,23 @@
@block = block
@cond = ConditionVariable.new
@mutex = Mutex.new
+ @done = ConditionVariable.new
+ @done_mutex = Mutex.new
+
@todo = []
@workers = []
@timeouts = {}
@spawned = 0
@waiting = 0
@shutdown = false
@trim_requests = 0
@auto_trim = false
+ @idle_trim = nil
@mutex.synchronize {
min.times {
spawn_thread
}
@@ -151,10 +155,26 @@
# Disable auto trimming.
def no_auto_trim!
@auto_trim = false
end
+ # Check if idle trimming is enabled.
+ def idle_trim?
+ !@idle_trim.nil?
+ end
+
+ # Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity.
+ # The minimum number of threads is respeced.
+ def idle_trim!(timeout)
+ @idle_trim = timeout
+ end
+
+ # Turn of idle trimming.
+ def no_idle_trim!
+ @idle_trim = nil
+ end
+
# Resize the pool with the passed arguments.
def resize (min, max = nil)
@min = min
@max = max || min
@@ -166,10 +186,23 @@
@mutex.synchronize {
@todo.length
}
end
+ # Are all tasks consumed ?
+ def done?
+ @todo.empty? and @waiting == @spawned
+ end
+
+ # Wait until all tasks are consumed. The caller will be blocked until then.
+ def wait_done
+ @done_mutex.synchronize {
+ return if done?
+ @done.wait @done_mutex
+ }
+ end
+
# Add a task to the pool which will execute the block with the given
# argument.
#
# If no block is passed the default block will be used if present, an
# ArgumentError will be raised otherwise.
@@ -200,11 +233,11 @@
# Trim the unused threads, if forced threads will be trimmed even if there
# are tasks waiting.
def trim (force = false)
@mutex.synchronize {
if (force || @waiting > 0) && @spawned - @trim_requests > @min
- @trim_requests -= 1
+ @trim_requests += 1
@cond.signal
end
}
self
@@ -304,11 +337,21 @@
end
break if shutdown?
@waiting += 1
- @cond.wait @mutex
+
+ report_done
+
+ if @idle_trim and @spawned > @min
+ check_time = Time.now + @idle_trim
+ @cond.wait @mutex, @idle_trim
+ @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min
+ else
+ @cond.wait @mutex
+ end
+
@waiting -= 1
end
break if @todo.empty? && shutdown?
end
@@ -366,9 +409,16 @@
break if @shutdown == :now
end
}
end
+
+ def report_done
+ @done_mutex.synchronize {
+ @done.broadcast if done?
+ }
+ end
+
end
class Thread
# Helper to create a pool.
def self.pool (*args, &block)