Sha256: 04e342fe9748404e4d7991c0f2e97f7703e9b33477facc18784c07292c03092c

Contents?: true

Size: 1.11 KB

Versions: 2

Compression:

Stored size: 1.11 KB

Contents

module Rmega
  class Pool
    include Options
    
    def initialize
      threads_raises_exceptions

      @queue = Queue.new
      @queue_closed = false
      @threads = []
      @cv = ConditionVariable.new
      @working_threads = 0
      
      options.thread_pool_size.times do
        @threads << Thread.new do
          while proc = @queue.pop
            mutex.synchronize do
              @working_threads += 1
            end
            
            proc.call

            mutex.synchronize do
              @working_threads -= 1
              
              if @queue_closed and @queue.empty? and @working_threads == 0
                @cv.signal
              end
            end
          end
        end
      end
    end

    def mutex
      @mutex ||= Mutex.new
    end

    def threads_raises_exceptions
      Thread.abort_on_exception = true
    end

    def process(&block)
      @queue << block
    end
    
    def wait_done
      @queue.close if @queue.respond_to?(:close)
      @queue_closed = true

      mutex.synchronize do
        @cv.wait(mutex)
      end

      @threads.each(&:kill)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rmega-0.3.2 lib/rmega/pool.rb
rmega-0.3.1 lib/rmega/pool.rb