Sha256: a4bdcfb91d7d86d1e8b500cb8872d6c9b861fe14639289f8c3e9e413611eb959

Contents?: true

Size: 1.81 KB

Versions: 10

Compression:

Stored size: 1.81 KB

Contents

# Inspired by https://github.com/meh/ruby-threadpool
require 'thread'

class ThreadPool

  class Job < Struct.new(:args, :block); end

  def initialize(min, max = nil)
    
    trap("INT") { shutdown }

    @min = min
    @max = max || min

    @cv = ConditionVariable.new
    @mutex = Mutex.new

    @queue = []
    @workers = []

    @spawned = 0
    @waiting = 0
    @shutdown = false
    @queue_locked = false

    @mutex.synchronize do
      min.times { spawn_thread }
    end
  end

  def execute(*args, &block)
    @mutex.synchronize do
      raise "Thread pool is about to shutdown" if @shutdown || @queue_locked

      @queue << Job.new(args, block)

      spawn_thread if @waiting == 0 && @spawned < @max

      @cv.signal
    end
  end
  alias :<< :execute

  def shutdown
    @mutex.synchronize do
      @shutdown = true
      @cv.broadcast
    end

    @workers.first.join until @workers.empty?
  end

  def join
    @mutex.synchronize do
      @queue_locked = true
      @cv.broadcast
      sleep 0.01 until @queue.empty?
    end
    shutdown
  end

  protected

  def spawn_thread
    thread = Thread.new do
      continue = true

      while continue do
        job = nil

        @mutex.synchronize do
          while @queue.empty? && continue
            if @shutdown || @queue_locked
              continue = false
              break
            end

            @waiting += 1
            @cv.wait @mutex
            @waiting -= 1

            if @shutdown || @queue_locked
              continue = false
              break
            end
          end

          if continue
            job = @queue.shift
            job.block.call(*job.args) if job
          end
        end
      end

      @mutex.synchronize do
        @spawned -= 1
        @workers.delete thread
      end
    end

    @workers << thread
    thread
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
filbunke-1.13.5 lib/filbunke/thread_pool.rb
filbunke-1.13.4 lib/filbunke/thread_pool.rb
filbunke-1.13.3 lib/filbunke/thread_pool.rb
filbunke-1.13.2 lib/filbunke/thread_pool.rb
filbunke-1.13.1 lib/filbunke/thread_pool.rb
filbunke-1.13.0 lib/filbunke/thread_pool.rb
filbunke-1.12.0 lib/filbunke/thread_pool.rb
filbunke-1.11.9 lib/filbunke/thread_pool.rb
filbunke-1.11.8 lib/filbunke/thread_pool.rb
filbunke-1.11.6 lib/filbunke/thread_pool.rb