Sha256: 0dae2abf29a127a802549880f24800025de3a6fbb58ddf2d4701febf50640b84

Contents?: true

Size: 1.03 KB

Versions: 2

Compression:

Stored size: 1.03 KB

Contents

require 'thread'

module Rmega
  class Pool
    MAX = 4

    def initialize(max = MAX)
      Thread.abort_on_exception = true

      @mutex = Mutex.new
      @resource = ConditionVariable.new
      @max = max || MAX

      @running = []
      @queue = []
    end

    def defer(&block)
      synchronize { @queue << block }
      process_queue
    end

    def wait_done
      synchronize { @resource.wait(@mutex) }
    end

    private

    def synchronize(&block)
      @mutex.synchronize(&block)
    end

    def process_queue
      synchronize do
        if @running.size < @max
          proc = @queue.shift
          @running << Thread.new(&thread_proc(&proc)) if proc
        end
      end
    end

    def done?
      synchronize { @queue.empty? && @running.empty? }
    end

    def signal_done
      synchronize { @resource.signal }
    end

    def thread_proc(&block)
      Proc.new do
        block.call
        @running.reject! { |thread| thread == Thread.current }
        process_queue
        signal_done if done?
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rmega-0.1.7 lib/rmega/pool.rb
rmega-0.1.6 lib/rmega/pool.rb