lib/thread/pool.rb in thread-0.0.8.1 vs lib/thread/pool.rb in thread-0.1.0

- old
+ new

@@ -115,19 +115,23 @@ @block = block @cond = ConditionVariable.new @mutex = Mutex.new + @done = ConditionVariable.new + @done_mutex = Mutex.new + @todo = [] @workers = [] @timeouts = {} @spawned = 0 @waiting = 0 @shutdown = false @trim_requests = 0 @auto_trim = false + @idle_trim = nil @mutex.synchronize { min.times { spawn_thread } @@ -151,10 +155,26 @@ # Disable auto trimming. def no_auto_trim! @auto_trim = false end + # Check if idle trimming is enabled. + def idle_trim? + !@idle_trim.nil? + end + + # Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. + # The minimum number of threads is respeced. + def idle_trim!(timeout) + @idle_trim = timeout + end + + # Turn of idle trimming. + def no_idle_trim! + @idle_trim = nil + end + # Resize the pool with the passed arguments. def resize (min, max = nil) @min = min @max = max || min @@ -166,10 +186,23 @@ @mutex.synchronize { @todo.length } end + # Are all tasks consumed ? + def done? + @todo.empty? and @waiting == @spawned + end + + # Wait until all tasks are consumed. The caller will be blocked until then. + def wait_done + @done_mutex.synchronize { + return if done? + @done.wait @done_mutex + } + end + # Add a task to the pool which will execute the block with the given # argument. # # If no block is passed the default block will be used if present, an # ArgumentError will be raised otherwise. @@ -200,11 +233,11 @@ # Trim the unused threads, if forced threads will be trimmed even if there # are tasks waiting. def trim (force = false) @mutex.synchronize { if (force || @waiting > 0) && @spawned - @trim_requests > @min - @trim_requests -= 1 + @trim_requests += 1 @cond.signal end } self @@ -304,11 +337,21 @@ end break if shutdown? @waiting += 1 - @cond.wait @mutex + + report_done + + if @idle_trim and @spawned > @min + check_time = Time.now + @idle_trim + @cond.wait @mutex, @idle_trim + @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min + else + @cond.wait @mutex + end + @waiting -= 1 end break if @todo.empty? && shutdown? end @@ -366,9 +409,16 @@ break if @shutdown == :now end } end + + def report_done + @done_mutex.synchronize { + @done.broadcast if done? + } + end + end class Thread # Helper to create a pool. def self.pool (*args, &block)