lib/rmega/pool.rb in rmega-0.1.5 vs lib/rmega/pool.rb in rmega-0.1.6
- old
+ new
@@ -1,64 +1,59 @@
require 'thread'
module Rmega
class Pool
- MAX = 5
+ MAX = 4
- def initialize(max)
- max ||= MAX
+ def initialize(max = MAX)
Thread.abort_on_exception = true
+
@mutex = Mutex.new
- @threads = Array.new(max)
- end
+ @resource = ConditionVariable.new
+ @max = max || MAX
- # Gets the first position of the pool in which
- # a thread could be started.
- def available_slot
- @threads.each_with_index do |thread, index|
- return index if thread.nil? or !thread.alive?
- end
- nil
+ @running = []
+ @queue = []
end
- def synchronize(&block)
- @mutex.synchronize(&block)
+ def defer(&block)
+ synchronize { @queue << block }
+ process_queue
end
- # Returns true if all the threads are finished,
- # false otherwise.
- def done?
- @threads.each { |thread| return false if thread and thread.alive? }
- true
+ def wait_done
+ synchronize { @resource.wait(@mutex) }
end
- # Blocking. Waits until all the threads are finished.
- def wait_done
- sleep 0.01 until done?
+ private
+
+ def synchronize(&block)
+ @mutex.synchronize(&block)
end
- # Blocking. Waits until a pool's slot become available and
- # returns that position.
- # TODO: raise an error on wait timeout.
- def wait_available_slot
- while true
- index = available_slot
- return index if index
- sleep 0.01
+ def process_queue
+ synchronize do
+ if @running.size < @max
+ proc = @queue.shift
+ @running << Thread.new(&thread_proc(&proc)) if proc
+ end
end
end
- # Sends a KILL signal to all the threads.
- def shutdown
- @threads.each { |thread| thread.kill if thread.respond_to?(:kill) }
- @threads.map! { nil }
+ def done?
+ synchronize { @queue.empty? && @running.empty? }
end
- # Blocking. Starts a new thread with the given block when a pool's slot
- # become available.
- def defer(&block)
- index = wait_available_slot
- @threads[index].kill if @threads[index].respond_to?(:kill)
- @threads[index] = Thread.new(&block)
+ def signal_done
+ synchronize { @resource.signal }
+ end
+
+ def thread_proc(&block)
+ Proc.new do
+ block.call
+ @running.reject! { |thread| thread == Thread.current }
+ process_queue
+ signal_done if done?
+ end
end
end
end