lib/thread/pool.rb in thread-0.1.6 vs lib/thread/pool.rb in thread-0.1.7

- old
+ new

@@ -187,60 +187,67 @@ @mutex.synchronize { @todo.length } end - # Are all tasks consumed ? + # Are all tasks consumed? def done? @mutex.synchronize { - @todo.empty? and @waiting == @spawned + _done? } end # Wait until all tasks are consumed. The caller will be blocked until then. - def wait_done - loop do - @done_mutex.synchronize { - return self if done? - @done.wait @done_mutex - } + def wait(what = :idle) + case what + when :done + loop do + @done_mutex.synchronize { + return self if _done? + + @done.wait @done_mutex + } + end + + when :idle + until idle? + @done_mutex.synchronize { + break if _idle? + + @done.wait @done_mutex + } + end end + + self end # Check if there are idle workers. def idle? @mutex.synchronize { - @todo.length < @waiting + _idle? } end - # Process Block when there is a idle worker if not block its returns - def idle (*args, &block) - while !idle? - @done_mutex.synchronize { - break if idle? - @done.wait @done_mutex - } - end - - unless block - return - end - - process *args, &block - end - # Add a task to the pool which will execute the block with the given # argument. # # If no block is passed the default block will be used if present, an # ArgumentError will be raised otherwise. def process (*args, &block) unless block || @block raise ArgumentError, 'you must pass a block' end + wait.process!(*args, &block) + end + + def process! (*args, &block) + unless block || @block + raise ArgumentError, 'you must pass a block' + end + task = Task.new(self, *args, &(block || @block)) @mutex.synchronize { raise 'unable to add work while shutting down' if shutdown? @@ -254,11 +261,11 @@ } task end - alias << process + alias << process! # Trim the unused threads, if forced threads will be trimmed even if there # are tasks waiting. def trim (force = false) @mutex.synchronize { @@ -348,11 +355,11 @@ # # Similar to Thread.abort_on_exception attr_accessor :abort_on_exception end - private +private def wake_up_timeout if defined? @pipes @pipes.last.write_nonblock 'x' rescue nil end end @@ -373,11 +380,11 @@ break if shutdown? @waiting += 1 - report_done + done! if @idle_trim and @spawned > @min check_time = Time.now + @idle_trim @cond.wait @mutex, @idle_trim @trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min @@ -445,12 +452,20 @@ break if @shutdown == :now end } end - def report_done + def _done? + @todo.empty? and @waiting == @spawned + end + + def _idle? + @todo.length < @waiting + end + + def done! @done_mutex.synchronize { - @done.broadcast if done? or idle? + @done.broadcast if _done? or _idle? } end end class Thread