lib/rmega/pool.rb in rmega-0.2.7 vs lib/rmega/pool.rb in rmega-0.3.1

- old
+ new

@@ -1,70 +1,58 @@ module Rmega class Pool include Options - + def initialize threads_raises_exceptions - @mutex = Mutex.new - @resource = ConditionVariable.new - @max = options.thread_pool_size + @queue = Queue.new + @queue_closed = false + @threads = [] + @cv = ConditionVariable.new + @working_threads = 0 + + options.thread_pool_size.times do + @threads << Thread.new do + while proc = @queue.pop + mutex.synchronize do + @working_threads += 1 + end + + proc.call - @running = [] - @queue = [] - end - - def threads_raises_exceptions - Thread.abort_on_exception = true - end - - def defer(&block) - synchronize { @queue << block } - process_queue - end - - alias :process :defer - - def wait_done - return if done? - synchronize { @resource.wait(@mutex) } - end - - alias :shutdown :wait_done - - private - - def synchronize(&block) - @mutex.synchronize(&block) - end - - def process_queue - synchronize do - if @running.size < @max - proc = @queue.shift - @running << Thread.new(&thread_proc(&proc)) if proc + mutex.synchronize do + @working_threads -= 1 + + if @queue_closed and @queue.empty? and @working_threads == 0 + @cv.signal + end + end + end end end end - def done? - synchronize { @queue.empty? && @running.empty? } + def mutex + @mutex ||= Mutex.new end - def signal_done - synchronize { @resource.signal } + def threads_raises_exceptions + Thread.abort_on_exception = true end - def thread_terminated - synchronize { @running.reject! { |thread| thread == Thread.current } } + def process(&block) + @queue << block end + + def wait_done + @queue.close if @queue.respond_to?(:close) + @queue_closed = true - def thread_proc(&block) - Proc.new do - block.call - thread_terminated - process_queue - signal_done if done? + mutex.synchronize do + @cv.wait(mutex) end + + @threads.each(&:kill) end end end