Sha256: e128686d8c37cf56c415139621d5fb27cf2d6e2b364f42d680a13c9baf3601cf
Contents?: true
Size: 1.25 KB
Versions: 7
Compression:
Stored size: 1.25 KB
Contents
module Rmega class Pool include Options def initialize threads_raises_exceptions @mutex = Mutex.new @resource = ConditionVariable.new @max = options.thread_pool_size @running = [] @queue = [] end def threads_raises_exceptions Thread.abort_on_exception = true end def defer(&block) synchronize { @queue << block } process_queue end alias :process :defer def wait_done return if done? synchronize { @resource.wait(@mutex) } end alias :shutdown :wait_done private def synchronize(&block) @mutex.synchronize(&block) end def process_queue synchronize do if @running.size < @max proc = @queue.shift @running << Thread.new(&thread_proc(&proc)) if proc end end end def done? synchronize { @queue.empty? && @running.empty? } end def signal_done synchronize { @resource.signal } end def thread_terminated synchronize { @running.reject! { |thread| thread == Thread.current } } end def thread_proc(&block) Proc.new do block.call thread_terminated process_queue signal_done if done? end end end end
Version data entries
7 entries across 7 versions & 1 rubygems