lib/thread/pool.rb in thread-0.1.6 vs lib/thread/pool.rb in thread-0.1.7
- old
+ new
@@ -187,60 +187,67 @@
@mutex.synchronize {
@todo.length
}
end
- # Are all tasks consumed ?
+ # Are all tasks consumed?
def done?
@mutex.synchronize {
- @todo.empty? and @waiting == @spawned
+ _done?
}
end
# Wait until all tasks are consumed. The caller will be blocked until then.
- def wait_done
- loop do
- @done_mutex.synchronize {
- return self if done?
- @done.wait @done_mutex
- }
+ def wait(what = :idle)
+ case what
+ when :done
+ loop do
+ @done_mutex.synchronize {
+ return self if _done?
+
+ @done.wait @done_mutex
+ }
+ end
+
+ when :idle
+ until idle?
+ @done_mutex.synchronize {
+ break if _idle?
+
+ @done.wait @done_mutex
+ }
+ end
end
+
+ self
end
# Check if there are idle workers.
def idle?
@mutex.synchronize {
- @todo.length < @waiting
+ _idle?
}
end
- # Process Block when there is a idle worker if not block its returns
- def idle (*args, &block)
- while !idle?
- @done_mutex.synchronize {
- break if idle?
- @done.wait @done_mutex
- }
- end
-
- unless block
- return
- end
-
- process *args, &block
- end
-
# Add a task to the pool which will execute the block with the given
# argument.
#
# If no block is passed the default block will be used if present, an
# ArgumentError will be raised otherwise.
def process (*args, &block)
unless block || @block
raise ArgumentError, 'you must pass a block'
end
+ wait.process!(*args, &block)
+ end
+
+ def process! (*args, &block)
+ unless block || @block
+ raise ArgumentError, 'you must pass a block'
+ end
+
task = Task.new(self, *args, &(block || @block))
@mutex.synchronize {
raise 'unable to add work while shutting down' if shutdown?
@@ -254,11 +261,11 @@
}
task
end
- alias << process
+ alias << process!
# Trim the unused threads, if forced threads will be trimmed even if there
# are tasks waiting.
def trim (force = false)
@mutex.synchronize {
@@ -348,11 +355,11 @@
#
# Similar to Thread.abort_on_exception
attr_accessor :abort_on_exception
end
- private
+private
def wake_up_timeout
if defined? @pipes
@pipes.last.write_nonblock 'x' rescue nil
end
end
@@ -373,11 +380,11 @@
break if shutdown?
@waiting += 1
- report_done
+ done!
if @idle_trim and @spawned > @min
check_time = Time.now + @idle_trim
@cond.wait @mutex, @idle_trim
@trim_requests += 1 if Time.now >= check_time && @spawned - @trim_requests > @min
@@ -445,12 +452,20 @@
break if @shutdown == :now
end
}
end
- def report_done
+ def _done?
+ @todo.empty? and @waiting == @spawned
+ end
+
+ def _idle?
+ @todo.length < @waiting
+ end
+
+ def done!
@done_mutex.synchronize {
- @done.broadcast if done? or idle?
+ @done.broadcast if _done? or _idle?
}
end
end
class Thread