lib/rmega/pool.rb in rmega-0.1.5 vs lib/rmega/pool.rb in rmega-0.1.6

- old
+ new

@@ -1,64 +1,59 @@ require 'thread' module Rmega class Pool - MAX = 5 + MAX = 4 - def initialize(max) - max ||= MAX + def initialize(max = MAX) Thread.abort_on_exception = true + @mutex = Mutex.new - @threads = Array.new(max) - end + @resource = ConditionVariable.new + @max = max || MAX - # Gets the first position of the pool in which - # a thread could be started. - def available_slot - @threads.each_with_index do |thread, index| - return index if thread.nil? or !thread.alive? - end - nil + @running = [] + @queue = [] end - def synchronize(&block) - @mutex.synchronize(&block) + def defer(&block) + synchronize { @queue << block } + process_queue end - # Returns true if all the threads are finished, - # false otherwise. - def done? - @threads.each { |thread| return false if thread and thread.alive? } - true + def wait_done + synchronize { @resource.wait(@mutex) } end - # Blocking. Waits until all the threads are finished. - def wait_done - sleep 0.01 until done? + private + + def synchronize(&block) + @mutex.synchronize(&block) end - # Blocking. Waits until a pool's slot become available and - # returns that position. - # TODO: raise an error on wait timeout. - def wait_available_slot - while true - index = available_slot - return index if index - sleep 0.01 + def process_queue + synchronize do + if @running.size < @max + proc = @queue.shift + @running << Thread.new(&thread_proc(&proc)) if proc + end end end - # Sends a KILL signal to all the threads. - def shutdown - @threads.each { |thread| thread.kill if thread.respond_to?(:kill) } - @threads.map! { nil } + def done? + synchronize { @queue.empty? && @running.empty? } end - # Blocking. Starts a new thread with the given block when a pool's slot - # become available. - def defer(&block) - index = wait_available_slot - @threads[index].kill if @threads[index].respond_to?(:kill) - @threads[index] = Thread.new(&block) + def signal_done + synchronize { @resource.signal } + end + + def thread_proc(&block) + Proc.new do + block.call + @running.reject! { |thread| thread == Thread.current } + process_queue + signal_done if done? + end end end end