lib/rmega/pool.rb in rmega-0.2.7 vs lib/rmega/pool.rb in rmega-0.3.1
- old
+ new
@@ -1,70 +1,58 @@
module Rmega
class Pool
include Options
-
+
def initialize
threads_raises_exceptions
- @mutex = Mutex.new
- @resource = ConditionVariable.new
- @max = options.thread_pool_size
+ @queue = Queue.new
+ @queue_closed = false
+ @threads = []
+ @cv = ConditionVariable.new
+ @working_threads = 0
+
+ options.thread_pool_size.times do
+ @threads << Thread.new do
+ while proc = @queue.pop
+ mutex.synchronize do
+ @working_threads += 1
+ end
+
+ proc.call
- @running = []
- @queue = []
- end
-
- def threads_raises_exceptions
- Thread.abort_on_exception = true
- end
-
- def defer(&block)
- synchronize { @queue << block }
- process_queue
- end
-
- alias :process :defer
-
- def wait_done
- return if done?
- synchronize { @resource.wait(@mutex) }
- end
-
- alias :shutdown :wait_done
-
- private
-
- def synchronize(&block)
- @mutex.synchronize(&block)
- end
-
- def process_queue
- synchronize do
- if @running.size < @max
- proc = @queue.shift
- @running << Thread.new(&thread_proc(&proc)) if proc
+ mutex.synchronize do
+ @working_threads -= 1
+
+ if @queue_closed and @queue.empty? and @working_threads == 0
+ @cv.signal
+ end
+ end
+ end
end
end
end
- def done?
- synchronize { @queue.empty? && @running.empty? }
+ def mutex
+ @mutex ||= Mutex.new
end
- def signal_done
- synchronize { @resource.signal }
+ def threads_raises_exceptions
+ Thread.abort_on_exception = true
end
- def thread_terminated
- synchronize { @running.reject! { |thread| thread == Thread.current } }
+ def process(&block)
+ @queue << block
end
+
+ def wait_done
+ @queue.close if @queue.respond_to?(:close)
+ @queue_closed = true
- def thread_proc(&block)
- Proc.new do
- block.call
- thread_terminated
- process_queue
- signal_done if done?
+ mutex.synchronize do
+ @cv.wait(mutex)
end
+
+ @threads.each(&:kill)
end
end
end